Revert "[ASTERIXDB-3574][STO] Taking resource-level lock instead of global lock"
Reason for Revert: causing some correctness issue while caching/uncaching files
Ext-ref: MB-65695
Change-Id: I0bc0afaccaaf3519fa6b51df06b6077933f91461
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19691
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 5035d25..269f6f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -284,6 +284,7 @@
long lsn = -1;
ILSMIndex index = null;
LocalResource localResource = null;
+ DatasetLocalResource localResourceMetadata = null;
boolean foundWinner = false;
JobEntityCommits jobEntityWinners = null;
@@ -353,11 +354,12 @@
//if index is not registered into IndexLifeCycleManager,
//create the index using LocalMetadata stored in LocalResourceRepository
//get partition path in this node
+ localResourceMetadata = (DatasetLocalResource) localResource.getResource();
index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath());
if (index == null) {
//#. create index instance and register to indexLifeCycleManager
- index = (ILSMIndex) datasetLifecycleManager.registerIfAbsent(localResource.getPath(),
- null);
+ index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
+ datasetLifecycleManager.register(localResource.getPath(), index);
datasetLifecycleManager.open(localResource.getPath());
try {
maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 2803f21..8d40acb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -19,10 +19,10 @@
package org.apache.asterix.common.context;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.api.IIOBlockingOperation;
@@ -60,8 +60,8 @@
private boolean durable;
public DatasetInfo(int datasetID, ILogManager logManager) {
- this.partitionIndexes = new ConcurrentHashMap<>();
- this.indexes = new ConcurrentHashMap<>();
+ this.partitionIndexes = new HashMap<>();
+ this.indexes = new HashMap<>();
this.partitionPendingIO = new Int2IntOpenHashMap();
this.setLastAccess(-1);
this.datasetID = datasetID;
@@ -203,13 +203,13 @@
return Collections.unmodifiableMap(indexes);
}
- public void addIndex(long resourceID, IndexInfo indexInfo) {
+ public synchronized void addIndex(long resourceID, IndexInfo indexInfo) {
indexes.put(resourceID, indexInfo);
partitionIndexes.computeIfAbsent(indexInfo.getPartition(), partition -> new HashSet<>()).add(indexInfo);
LOGGER.debug("registered reference to index {}", indexInfo);
}
- public void removeIndex(long resourceID) {
+ public synchronized void removeIndex(long resourceID) {
IndexInfo info = indexes.remove(resourceID);
if (info != null) {
partitionIndexes.get(info.getPartition()).remove(info);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 0128297..96f7241 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -23,13 +23,10 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.IntPredicate;
import java.util.function.Predicate;
@@ -65,16 +62,13 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.IResource;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
-import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-@ThreadSafe
public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
private static final Logger LOGGER = LogManager.getLogger();
@@ -88,8 +82,6 @@
private final LogRecord waitLog;
protected final IDiskResourceCacheLockNotifier lockNotifier;
private volatile boolean stopped = false;
- private final ReentrantReadWriteLock stopLock;
- private final Map<String, ReentrantReadWriteLock> resourceLocks;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
// all LSM-trees share the same virtual buffer cache list
private final List<IVirtualBufferCache> vbcs;
@@ -104,7 +96,6 @@
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
this.vbc = vbc;
- this.stopLock = new ReentrantReadWriteLock();
int numMemoryComponents = storageProperties.getMemoryComponentsNum();
this.vbcs = new ArrayList<>(numMemoryComponents);
for (int i = 0; i < numMemoryComponents; i++) {
@@ -112,14 +103,13 @@
}
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
this.lockNotifier = lockNotifier;
- this.resourceLocks = Collections.synchronizedMap(new WeakHashMap<>());
waitLog = new LogRecord();
waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
waitLog.computeAndSetLogSize();
}
@Override
- public ILSMIndex get(String resourcePath) throws HyracksDataException {
+ public synchronized ILSMIndex get(String resourcePath) throws HyracksDataException {
validateDatasetLifecycleManagerState();
int datasetID = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -127,7 +117,7 @@
}
@Override
- public ILSMIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
+ public synchronized ILSMIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
validateDatasetLifecycleManagerState();
DatasetResource datasetResource = datasets.get(datasetID);
if (datasetResource == null) {
@@ -137,52 +127,16 @@
}
@Override
- public IIndex registerIfAbsent(String resourcePath, IIndex index) throws HyracksDataException {
- stopLock.readLock().lock();
- try {
- validateDatasetLifecycleManagerState();
-
- IIndex existingIndex = get(resourcePath);
- if (existingIndex != null) {
- return existingIndex;
- }
-
- ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
- resourceLock.writeLock().lock();
- try {
- existingIndex = get(resourcePath);
- if (existingIndex != null) {
- return existingIndex;
- }
-
- if (index == null) {
- index = getOrCreateIndex(resourcePath);
- }
-
- int datasetID = getDIDfromResourcePath(resourcePath);
- LocalResource resource = resourceRepository.get(resourcePath);
- lockNotifier.onRegister(resource, index);
-
- DatasetResource datasetResource = datasets.get(datasetID);
- if (datasetResource == null) {
- datasetResource = getDatasetLifecycle(datasetID);
- }
-
- datasetResource.register(resource, (ILSMIndex) index);
- } finally {
- resourceLock.writeLock().unlock();
- }
- } finally {
- stopLock.readLock().unlock();
+ public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
+ validateDatasetLifecycleManagerState();
+ int did = getDIDfromResourcePath(resourcePath);
+ LocalResource resource = resourceRepository.get(resourcePath);
+ DatasetResource datasetResource = datasets.get(did);
+ lockNotifier.onRegister(resource, index);
+ if (datasetResource == null) {
+ datasetResource = getDatasetLifecycle(did);
}
-
- return index;
- }
-
- private ReentrantReadWriteLock getResourceLock(String resourcePath) {
- // create fair locks inorder to avoid starving.
- // actually I believe there shouldn't be any kinda starving as the separate locks are created for each resource.
- return resourceLocks.computeIfAbsent(resourcePath, k -> new ReentrantReadWriteLock(true));
+ datasetResource.register(resource, (ILSMIndex) index);
}
protected int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -209,261 +163,115 @@
return (DatasetLocalResource) lr.getResource();
}
- /**
- * Concurrency considerations for dataset operations:
- *
- * This method requires dataset locks to handle concurrent operations:
- *
- * 1. Dataset-level locking:
- * - The open() method works on all indexes of a dataset.
- * - The first-time open() call triggers recoverIndex(), which acquires the dataset lock.
- * - Race conditions may occur if index recovery happens while an index is being dropped.
- *
- * 2. Lock acquisition strategy:
- * - Both the dataset lock and the local resource lock must be acquired.
- * - This prevents register/open operations from occurring while an unregister operation is in progress.
- * - These locks must be held until the unregister operation completes.
- *
- * 3. Possible race scenarios:
- * Consider the following threads:
- * - t1: unregister(ds/0/idx1)
- * - t2: open(ds/0/idx1)
- * - t3: unregister(ds/0/idx2)
- * - t4: unregister(newDs/0/idx)
- * - If t2 is running, t1 and t3 should wait (same dataset), but t4 can proceed (different dataset).
- * - If t2 hasn't started yet (dataset lock not acquired), t1 and t3 could execute.
- *
- * 4. Race condition handling:
- * - If t1 starts unregistering, t2 starts opening, and t3 starts unregistering simultaneously:
- * - t2 takes the dataset lock.
- * - Depending on the timing of registration completion, the resource repository may or may not contain the index.
- * - If the index does not exist, an INDEX_DOES_NOT_EXIST error will occur.
- *
- * Note: At the compilation layer, exclusive dataset locks prevent DROP/UNREGISTER operations
- * from colliding with OPEN/REGISTER operations.
- */
@Override
- public void unregister(String resourcePath) throws HyracksDataException {
- stopLock.readLock().lock();
- try {
- validateDatasetLifecycleManagerState();
- String datasetPartitionPath = StoragePathUtil.getDatasetPartitionPath(resourcePath);
+ public synchronized void unregister(String resourcePath) throws HyracksDataException {
+ validateDatasetLifecycleManagerState();
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
+ DatasetResource dsr = datasets.get(did);
+ IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
- ReentrantReadWriteLock partitionResourceLock = getResourceLock(datasetPartitionPath);
- ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
- partitionResourceLock.writeLock().lock();
- try {
- resourceLock.writeLock().lock();
- try {
- int did = getDIDfromResourcePath(resourcePath);
- long resourceID = getResourceIDfromResourcePath(resourcePath);
- DatasetResource dsr = datasets.get(did);
- IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+ if (dsr == null || iInfo == null) {
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+ }
- if (dsr == null || iInfo == null) {
- throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
- }
-
- lockNotifier.onUnregister(resourceID);
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
- if (iInfo.getReferenceCount() != 0
- || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
- if (LOGGER.isErrorEnabled()) {
- final String logMsg = String.format(
- "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
- resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
- LOGGER.error(logMsg);
- }
- throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
- StoragePathUtil.getIndexNameFromPath(resourcePath));
- }
-
- // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
- DatasetInfo dsInfo = dsr.getDatasetInfo();
- dsInfo.waitForIO();
- closeIndex(iInfo);
- dsInfo.removeIndex(resourceID);
- synchronized (dsInfo) {
- int referenceCount = dsInfo.getReferenceCount();
- boolean open = dsInfo.isOpen();
- boolean empty = dsInfo.getIndexes().isEmpty();
- if (referenceCount == 0 && open && empty && !dsInfo.isExternal()) {
- LOGGER.debug("removing dataset {} from cache", dsInfo.getDatasetID());
- removeDatasetFromCache(dsInfo.getDatasetID());
- } else {
- LOGGER.debug("keeping dataset {} in cache, ref count {}, open {}, indexes count: {}",
- dsInfo.getDatasetID(), referenceCount, open, dsInfo.getIndexes().size());
- }
- }
- } finally {
- resourceLock.writeLock().unlock();
- }
- } finally {
- partitionResourceLock.writeLock().unlock();
+ lockNotifier.onUnregister(resourceID);
+ PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+ if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+ if (LOGGER.isErrorEnabled()) {
+ final String logMsg = String.format(
+ "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+ resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+ LOGGER.error(logMsg);
}
- } finally {
- stopLock.readLock().unlock();
+ throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+ StoragePathUtil.getIndexNameFromPath(resourcePath));
+ }
+
+ // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+ DatasetInfo dsInfo = dsr.getDatasetInfo();
+ dsInfo.waitForIO();
+ closeIndex(iInfo);
+ dsInfo.removeIndex(resourceID);
+ synchronized (dsInfo) {
+ int referenceCount = dsInfo.getReferenceCount();
+ boolean open = dsInfo.isOpen();
+ boolean empty = dsInfo.getIndexes().isEmpty();
+ if (referenceCount == 0 && open && empty && !dsInfo.isExternal()) {
+ LOGGER.debug("removing dataset {} from cache", dsInfo.getDatasetID());
+ removeDatasetFromCache(dsInfo.getDatasetID());
+ } else {
+ LOGGER.debug("keeping dataset {} in cache, ref count {}, open {}, indexes count: {}",
+ dsInfo.getDatasetID(), referenceCount, open, dsInfo.getIndexes().size());
+ }
}
}
@Override
- public void destroy(String resourcePath) throws HyracksDataException {
- stopLock.readLock().lock();
+ public synchronized void open(String resourcePath) throws HyracksDataException {
+ validateDatasetLifecycleManagerState();
+ DatasetLocalResource localResource = getDatasetLocalResource(resourcePath);
+ if (localResource == null) {
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+ }
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+ lockNotifier.onOpen(resourceID);
try {
- ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
- resourceLock.writeLock().lock();
- try {
- LOGGER.info("Dropping index {} on node {}", resourcePath, serviceCtx.getNodeId());
- IIndex index = get(resourcePath);
- if (index != null) {
- unregister(resourcePath);
- } else {
- index = readIndex(resourcePath);
- }
- if (getResourceId(resourcePath) != -1) {
- resourceRepository.delete(resourcePath);
- }
- index.destroy();
- } finally {
- resourceLock.writeLock().unlock();
+ DatasetResource datasetResource = datasets.get(did);
+ int partition = localResource.getPartition();
+ if (shouldRecoverLazily(datasetResource, partition)) {
+ performLocalRecovery(resourcePath, datasetResource, partition);
+ } else {
+ openResource(resourcePath, false);
}
} finally {
- stopLock.readLock().unlock();
+ lockNotifier.onClose(resourceID);
}
}
- private long getResourceId(String resourcePath) throws HyracksDataException {
- LocalResource lr = resourceRepository.get(resourcePath);
- return lr == null ? -1 : lr.getId();
- }
-
- @Override
- public void open(String resourcePath) throws HyracksDataException {
- stopLock.readLock().lock();
- try {
- validateDatasetLifecycleManagerState();
-
- DatasetLocalResource localResource = getDatasetLocalResource(resourcePath);
- if (localResource == null) {
- throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
- }
-
- int did = getDIDfromResourcePath(resourcePath);
- long resourceID = getResourceIDfromResourcePath(resourcePath);
-
- // Notify before opening a resource
- lockNotifier.onOpen(resourceID);
- try {
- DatasetResource datasetResource = datasets.get(did);
- int partition = localResource.getPartition();
- boolean lazyRecover = shouldRecoverLazily(datasetResource, partition);
-
- if (!lazyRecover) {
- openResource(resourcePath, false);
- return;
- }
-
- // Perform local recovery by taking a lock on the root resource
- boolean recoveredByCurrentThread = performLocalRecovery(resourcePath, datasetResource, partition);
-
- /*
- * Concurrent Access Scenario for Index Resource Recovery:
- * ------------------------------------------------------
- * When multiple threads attempt to open the same index resource, a race
- * condition can occur.
- *
- * Example:
- * - Thread1 (handling ds/0/idx1) and Thread2 (handling ds/0/idx2) may both
- * try to recover dataset indexes.
- * - Thread1 enters `ensureIndexOpenAndConsistent()`, detects that the resource
- * is not recovered, and starts recovery.
- * - Before Thread1 marks recovery as complete, Thread2 also checks and finds
- * the resource not recovered, so it attempts recovery too.
- *
- * However, index recovery is an expensive operation and should be performed by
- * only one thread. To prevent multiple recoveries, `ensureIndexOpenAndConsistent()`
- * must be synchronized on the root dataset resource. This ensures that only one
- * thread performs the recovery, while others wait.
- *
- * Behavior After Synchronization:
- * - Thread1 recovers and marks the index as recovered.
- * - Thread2, after acquiring the lock, sees that the index is already recovered.
- * - It returns `false` from `ensureIndexOpenAndConsistent()` and proceeds to call
- * `open()`.
- * - Since `open()` is idempotent, if the index is already open (`iInfo.isOpen()`),
- * it does nothing except incrementing open stats.
- */
- if (!recoveredByCurrentThread) {
- openResource(resourcePath, false);
- }
- } finally {
- lockNotifier.onClose(resourceID);
- }
- } finally {
- stopLock.readLock().unlock();
- }
- }
-
- private boolean performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition)
+ private void performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition)
throws HyracksDataException {
+ LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(),
+ partition);
+ FileReference indexRootRef = StoragePathUtil.getIndexRootPath(serviceCtx.getIoManager(), resourcePath);
+ Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef));
- String indexRootRefPath = StoragePathUtil.getDatasetPartitionPath(resourcePath);
- ReentrantReadWriteLock resourceLock = getResourceLock(indexRootRefPath);
- FileReference indexRootRef = serviceCtx.getIoManager().resolve(indexRootRefPath);
- resourceLock.writeLock().lock();
- try {
- if (!shouldRecoverLazily(datasetResource, partition)) {
- return false;
- }
- LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(),
- partition);
- Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef));
-
- List<ILSMIndex> indexes = new ArrayList<>();
- for (LocalResource resource : resources.values()) {
- if (shouldSkipRecoveringResource(resource)) {
- continue;
- }
-
- ILSMIndex index = (ILSMIndex) registerIfAbsent(resource.getPath(), null);
- boolean undoTouch = !resourcePath.equals(resource.getPath());
- openResource(resource.getPath(), undoTouch);
- indexes.add(index);
+ List<ILSMIndex> indexes = new ArrayList<>();
+ for (LocalResource resource : resources.values()) {
+ if (shouldSkipResource(resource)) {
+ continue;
}
- if (!indexes.isEmpty()) {
- recoveryMgr.recoverIndexes(indexes);
- }
-
- datasetResource.markRecovered(partition);
- return true;
- } finally {
- resourceLock.writeLock().unlock();
+ ILSMIndex index = getOrCreateIndex(resource);
+ boolean undoTouch = !resourcePath.equals(resource.getPath());
+ openResource(resource.getPath(), undoTouch);
+ indexes.add(index);
}
+
+ if (!indexes.isEmpty()) {
+ recoveryMgr.recoverIndexes(indexes);
+ }
+
+ datasetResource.markRecovered(partition);
}
- private boolean shouldSkipRecoveringResource(LocalResource resource) {
+ private boolean shouldSkipResource(LocalResource resource) {
DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
|| (lr.getResource() instanceof LSMBTreeLocalResource
&& ((LSMBTreeLocalResource) lr.getResource()).isSecondaryNoIncrementalMaintenance());
}
- private IIndex getOrCreateIndex(String resourcePath) throws HyracksDataException {
- IIndex index = get(resourcePath);
- if (index != null) {
- return index;
+ private ILSMIndex getOrCreateIndex(LocalResource resource) throws HyracksDataException {
+ ILSMIndex index = get(resource.getPath());
+ if (index == null) {
+ DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
+ index = (ILSMIndex) lr.createInstance(serviceCtx);
+ register(resource.getPath(), index);
}
- return readIndex(resourcePath);
- }
-
- private IIndex readIndex(String resourcePath) throws HyracksDataException {
- LocalResource lr = resourceRepository.get(resourcePath);
- if (lr == null) {
- throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
- }
- IResource resource = lr.getResource();
- return resource.createInstance(serviceCtx);
+ return index;
}
private void openResource(String resourcePath, boolean undoTouch) throws HyracksDataException {
@@ -492,15 +300,11 @@
boolean indexTouched = false;
try {
if (!iInfo.isOpen()) {
- synchronized (iInfo) {
- if (!iInfo.isOpen()) {
- ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
- synchronized (opTracker) {
- iInfo.getIndex().activate();
- }
- iInfo.setOpen(true);
- }
+ ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
+ synchronized (opTracker) {
+ iInfo.getIndex().activate();
}
+ iInfo.setOpen(true);
}
iInfo.touch();
indexTouched = true;
@@ -530,7 +334,15 @@
if (dsr != null) {
return dsr;
}
- return datasets.computeIfAbsent(did, k -> new DatasetResource(new DatasetInfo(did, logManager)));
+ synchronized (datasets) {
+ dsr = datasets.get(did);
+ if (dsr == null) {
+ DatasetInfo dsInfo = new DatasetInfo(did, logManager);
+ dsr = new DatasetResource(dsInfo);
+ datasets.put(did, dsr);
+ }
+ return dsr;
+ }
}
@Override
@@ -539,62 +351,46 @@
}
@Override
- public void close(String resourcePath) throws HyracksDataException {
- stopLock.readLock().lock();
+ public synchronized void close(String resourcePath) throws HyracksDataException {
+ DatasetResource dsr = null;
+ IndexInfo iInfo = null;
try {
- DatasetResource dsr = null;
- IndexInfo iInfo = null;
-
- // A resource lock may not be necessary if the unregister case does not need handling.
- ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
- resourceLock.writeLock().lock();
- try {
- validateDatasetLifecycleManagerState();
-
- int did = getDIDfromResourcePath(resourcePath);
- long resourceID = getResourceIDfromResourcePath(resourcePath);
-
- dsr = datasets.get(did);
- if (dsr == null) {
- throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
- }
-
- iInfo = dsr.getIndexInfo(resourceID);
- if (iInfo == null) {
- throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
- }
-
- lockNotifier.onClose(resourceID);
- } finally {
- // Regardless of any exceptions thrown in the try block (e.g., missing index),
- // we must ensure that the index and dataset are marked as untouched.
- if (iInfo != null) {
- iInfo.untouch();
- }
- if (dsr != null) {
- dsr.untouch();
- }
- resourceLock.writeLock().unlock();
+ validateDatasetLifecycleManagerState();
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
+ dsr = datasets.get(did);
+ if (dsr == null) {
+ throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
}
+ iInfo = dsr.getIndexInfo(resourceID);
+ if (iInfo == null) {
+ throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+ }
+ lockNotifier.onClose(resourceID);
} finally {
- stopLock.readLock().unlock();
- }
- }
-
- @Override
- public List<IIndex> getOpenResources() {
- synchronized (datasets) {
- List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
- List<IIndex> openIndexes = new ArrayList<>();
- for (IndexInfo iInfo : openIndexesInfo) {
- openIndexes.add(iInfo.getIndex());
+ // Regardless of what exception is thrown in the try-block (e.g., line 279),
+ // we have to un-touch the index and dataset.
+ if (iInfo != null) {
+ iInfo.untouch();
}
- return openIndexes;
+ if (dsr != null) {
+ dsr.untouch();
+ }
}
}
@Override
- public List<IndexInfo> getOpenIndexesInfo() {
+ public synchronized List<IIndex> getOpenResources() {
+ List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
+ List<IIndex> openIndexes = new ArrayList<>();
+ for (IndexInfo iInfo : openIndexesInfo) {
+ openIndexes.add(iInfo.getIndex());
+ }
+ return openIndexes;
+ }
+
+ @Override
+ public synchronized List<IndexInfo> getOpenIndexesInfo() {
List<IndexInfo> openIndexesInfo = new ArrayList<>();
for (DatasetResource dsr : datasets.values()) {
for (IndexInfo iInfo : dsr.getIndexes().values()) {
@@ -616,50 +412,27 @@
}
@Override
- public PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String resourcePath) {
+ public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path) {
DatasetResource dataset = getDatasetLifecycle(datasetId);
PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
- if (opTracker != null) {
- return opTracker;
- }
- ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
- resourceLock.writeLock().lock();
- try {
+ if (opTracker == null) {
+ populateOpTrackerAndIdGenerator(dataset, partition, path);
opTracker = dataset.getOpTracker(partition);
- if (opTracker != null) {
- return opTracker;
- }
- populateOpTrackerAndIdGenerator(dataset, partition, resourcePath);
- opTracker = dataset.getOpTracker(partition);
- return opTracker;
- } finally {
- resourceLock.writeLock().unlock();
}
+ return opTracker;
}
@Override
- public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
- DatasetResource dataset = getDatasetLifecycle(datasetId);
+ public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
+ DatasetResource dataset = datasets.get(datasetId);
ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
- if (generator != null) {
- return generator;
- }
- ReentrantReadWriteLock resourceLock = getResourceLock(path);
- resourceLock.writeLock().lock();
- try {
- generator = dataset.getComponentIdGenerator(partition);
- if (generator != null) {
- return generator;
- }
+ if (generator == null) {
populateOpTrackerAndIdGenerator(dataset, partition, path);
generator = dataset.getComponentIdGenerator(partition);
- return generator;
- } finally {
- resourceLock.writeLock().unlock();
}
+ return generator;
}
- // this function is not being used.
@Override
public synchronized IRateLimiter getRateLimiter(int datasetId, int partition, long writeRateLimit) {
DatasetResource dataset = datasets.get(datasetId);
@@ -671,7 +444,7 @@
}
@Override
- public boolean isRegistered(int datasetId) {
+ public synchronized boolean isRegistered(int datasetId) {
return datasets.containsKey(datasetId);
}
@@ -703,39 +476,34 @@
}
@Override
- public void flushAllDatasets() throws HyracksDataException {
+ public synchronized void flushAllDatasets() throws HyracksDataException {
flushAllDatasets(partition -> true);
}
@Override
- public void flushAllDatasets(IntPredicate partitions) throws HyracksDataException {
- synchronized (datasets) {
- for (DatasetResource dsr : datasets.values()) {
- if (dsr.getDatasetInfo().isOpen()) {
- flushDatasetOpenIndexes(dsr, partitions, false);
- }
+ public synchronized void flushAllDatasets(IntPredicate partitions) throws HyracksDataException {
+ for (DatasetResource dsr : datasets.values()) {
+ if (dsr.getDatasetInfo().isOpen()) {
+ flushDatasetOpenIndexes(dsr, partitions, false);
}
}
}
@Override
- public void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
- synchronized (datasets) {
- DatasetResource dsr = datasets.get(datasetId);
- if (dsr != null) {
- flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
- }
+ public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
+ DatasetResource dsr = datasets.get(datasetId);
+ if (dsr != null) {
+ flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
}
}
@Override
- public void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate) throws HyracksDataException {
- synchronized (datasets) {
- for (DatasetResource dsr : datasets.values()) {
- for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
- synchronized (opTracker) {
- asyncFlush(dsr, opTracker, indexPredicate);
- }
+ public synchronized void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate)
+ throws HyracksDataException {
+ for (DatasetResource dsr : datasets.values()) {
+ for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
+ synchronized (opTracker) {
+ asyncFlush(dsr, opTracker, indexPredicate);
}
}
}
@@ -826,45 +594,38 @@
}
@Override
- public void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
- synchronized (datasets) {
- for (DatasetResource dsr : datasets.values()) {
- if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
- closeDataset(dsr);
- }
+ public synchronized void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
+ ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+ for (DatasetResource dsr : openDatasets) {
+ if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
+ closeDataset(dsr);
}
}
}
@Override
- public void closeAllDatasets() throws HyracksDataException {
- synchronized (datasets) {
- for (DatasetResource dsr : datasets.values()) {
- if (dsr.isOpen()) {
- closeDataset(dsr);
- }
+ public synchronized void closeAllDatasets() throws HyracksDataException {
+ ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+ for (DatasetResource dsr : openDatasets) {
+ if (dsr.isOpen()) {
+ closeDataset(dsr);
}
}
}
@Override
public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException {
- stopLock.writeLock().lock();
- try {
- if (stopped) {
- return;
- }
- if (dumpState) {
- dumpState(outputStream);
- }
-
- closeAllDatasets();
-
- datasets.clear();
- stopped = true;
- } finally {
- stopLock.writeLock().unlock();
+ if (stopped) {
+ return;
}
+ if (dumpState) {
+ dumpState(outputStream);
+ }
+
+ closeAllDatasets();
+
+ datasets.clear();
+ stopped = true;
}
@Override
@@ -904,11 +665,9 @@
@Override
public void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions)
throws HyracksDataException {
- synchronized (datasets) {
- for (DatasetResource dsr : datasets.values()) {
- if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
- flushDatasetOpenIndexes(dsr, partitions, false);
- }
+ for (DatasetResource dsr : datasets.values()) {
+ if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
+ flushDatasetOpenIndexes(dsr, partitions, false);
}
}
}
@@ -963,60 +722,47 @@
//TODO refactor this method with unregister method
@Override
- public void closeIfOpen(String resourcePath) throws HyracksDataException {
- stopLock.readLock().lock();
- try {
- validateDatasetLifecycleManagerState();
- ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
- resourceLock.writeLock().lock();
- try {
- int did = getDIDfromResourcePath(resourcePath);
- long resourceID = getResourceIDfromResourcePath(resourcePath);
+ public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
+ validateDatasetLifecycleManagerState();
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
- DatasetResource dsr = datasets.get(did);
- IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+ DatasetResource dsr = datasets.get(did);
+ IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
- if (dsr == null || iInfo == null) {
- return;
- }
+ if (dsr == null || iInfo == null) {
+ return;
+ }
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
- if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
- if (LOGGER.isErrorEnabled()) {
- final String logMsg = String.format(
- "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
- resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
- LOGGER.error(logMsg);
- }
- throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
- StoragePathUtil.getIndexNameFromPath(resourcePath));
- }
-
- // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
- DatasetInfo dsInfo = dsr.getDatasetInfo();
- dsInfo.waitForIO();
- closeIndex(iInfo);
- dsInfo.removeIndex(resourceID);
- synchronized (dsInfo) {
- if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
- && !dsInfo.isExternal()) {
- removeDatasetFromCache(dsInfo.getDatasetID());
- }
- }
- } finally {
- resourceLock.writeLock().unlock();
+ PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+ if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+ if (LOGGER.isErrorEnabled()) {
+ final String logMsg = String.format(
+ "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+ resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+ LOGGER.error(logMsg);
}
- } finally {
- stopLock.readLock().unlock();
+ throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+ StoragePathUtil.getIndexNameFromPath(resourcePath));
+ }
+
+ // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+ DatasetInfo dsInfo = dsr.getDatasetInfo();
+ dsInfo.waitForIO();
+ closeIndex(iInfo);
+ dsInfo.removeIndex(resourceID);
+ synchronized (dsInfo) {
+ if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+ && !dsInfo.isExternal()) {
+ removeDatasetFromCache(dsInfo.getDatasetID());
+ }
}
}
@Override
- public void closePartition(int partitionId) {
- synchronized (datasets) {
- for (DatasetResource ds : datasets.values()) {
- ds.removePartition(partitionId);
- }
+ public synchronized void closePartition(int partitionId) {
+ for (DatasetResource ds : datasets.values()) {
+ ds.removePartition(partitionId);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 0c8f141..8e3081d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -19,10 +19,10 @@
package org.apache.asterix.common.context;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
@@ -54,9 +54,9 @@
public DatasetResource(DatasetInfo datasetInfo) {
this.datasetInfo = datasetInfo;
- this.datasetPrimaryOpTrackers = new ConcurrentHashMap<>();
- this.datasetComponentIdGenerators = new ConcurrentHashMap<>();
- this.datasetRateLimiters = new ConcurrentHashMap<>();
+ this.datasetPrimaryOpTrackers = new HashMap<>();
+ this.datasetComponentIdGenerators = new HashMap<>();
+ this.datasetRateLimiters = new HashMap<>();
this.recoveredPartitions = new HashSet<>();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 9f0f5c7..2fdfba4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -139,9 +139,11 @@
return ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString();
}
- public static String getDatasetPartitionPath(String relativePath) {
+ public static FileReference getIndexRootPath(IIOManager ioManager, String relativePath)
+ throws HyracksDataException {
int separatorIndex = relativePath.lastIndexOf(File.separatorChar);
- return relativePath.substring(0, separatorIndex);
+ String parentDirectory = relativePath.substring(0, separatorIndex);
+ return ioManager.resolve(parentDirectory);
}
/**
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java
deleted file mode 100644
index d15a9d4..0000000
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java
+++ /dev/null
@@ -1,2104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.context;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.dataflow.DatasetLocalResource;
-import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.hyracks.api.application.INCServiceContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.replication.IReplicationJob;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
-import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
-import org.apache.hyracks.storage.common.IIndex;
-import org.apache.hyracks.storage.common.IIndexAccessParameters;
-import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.IIndexCursor;
-import org.apache.hyracks.storage.common.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.ISearchPredicate;
-import org.apache.hyracks.storage.common.LocalResource;
-import org.apache.hyracks.storage.common.buffercache.IBufferCache;
-import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
-import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Concurrent tests for the DatasetLifecycleManager class.
- *
- * This test class verifies that the DatasetLifecycleManager properly handles
- * concurrent register, open, close, and unregister operations on indexes.
- *
- * The tests use mock objects to simulate the index lifecycle and verify:
- * 1. Thread safety of operations
- * 2. Correct handling of re-registration attempts
- * 3. Proper activation/deactivation during open/close
- * 4. Correct identity preservation for index instances
- * 5. Concurrent access to shared resources
- *
- * Each test method focuses on a specific aspect of concurrent behavior:
- * - testConcurrentRegisterAndOpen: Tests register and open operations
- * - testConcurrentRegisterOpenAndUnregister: Tests the full lifecycle
- * - testRegisterAlreadyRegisteredIndex: Tests re-registration behavior
- * - testConcurrentOpenSameIndex: Tests opening the same index concurrently
- * - testConcurrentCloseAfterOpen: Tests closing after open operations
- * - testMockIndexIdentity: Tests the mock setup itself
- */
-public class DatasetLifecycleManagerConcurrentTest {
- private static final Logger LOGGER = LogManager.getLogger();
-
- // Configuration constants
- private static final int NUM_DATASETS = 3;
- private static final int NUM_PARTITIONS = 2;
- private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors();
- private static final int NUM_OPERATIONS_PER_THREAD = 20;
-
- // For resource paths
- private static final String DB_NAME = "Default";
- private static final String SCOPE_NAME = "SDefault";
- private static final String STORAGE_DIR = "storage";
-
- private DatasetLifecycleManager datasetLifecycleManager;
- private INCServiceContext mockServiceContext;
- private StorageProperties mockStorageProperties;
- private ILocalResourceRepository mockResourceRepository;
- private IRecoveryManager mockRecoveryManager;
- private ILogManager mockLogManager;
- private IVirtualBufferCache mockVBC;
- private IIndexCheckpointManagerProvider mockCheckpointProvider;
- private IDiskResourceCacheLockNotifier mockLockNotifier;
- private IIOManager mockIOManager;
-
- private Map<String, MockIndex> mockIndexes;
- private Map<String, LocalResource> mockResources;
-
- private ExecutorService executorService;
- private List<String> createdResourcePaths;
-
- private Map<Integer, DatasetResource> datasetResourceMap;
-
- // Add this field after the other private fields
- private boolean lazyRecoveryEnabled = false;
-
- /**
- * Sets up the test environment with mock objects and resources.
- * This includes:
- * 1. Creating mock service context and other components
- * 2. Setting up mock resources with unique IDs
- * 3. Creating the DatasetLifecycleManager with mock dependencies
- * 4. Setting up a thread pool for concurrent operations
- */
- @Before
- public void setUp() throws Exception {
- // Initialize collections
- mockIndexes = new ConcurrentHashMap<>();
- mockResources = new ConcurrentHashMap<>();
- createdResourcePaths = new ArrayList<>();
- datasetResourceMap = new HashMap<>();
-
- // Set up mocks
- mockServiceContext = mock(INCServiceContext.class);
- mockStorageProperties = mock(StorageProperties.class);
- mockResourceRepository = mock(ILocalResourceRepository.class);
- mockRecoveryManager = mock(IRecoveryManager.class);
- mockLogManager = mock(ILogManager.class);
- mockVBC = mock(IVirtualBufferCache.class);
- mockCheckpointProvider = mock(IIndexCheckpointManagerProvider.class);
- mockLockNotifier = mock(IDiskResourceCacheLockNotifier.class);
- mockIOManager = mock(IIOManager.class);
-
- // Mock behavior
- when(mockServiceContext.getIoManager()).thenReturn(mockIOManager);
- when(mockStorageProperties.getMemoryComponentsNum()).thenReturn(2);
- when(mockRecoveryManager.isLazyRecoveryEnabled()).thenReturn(lazyRecoveryEnabled);
-
- // Create DatasetLifecycleManager FIRST
- datasetLifecycleManager =
- new DatasetLifecycleManager(mockServiceContext, mockStorageProperties, mockResourceRepository,
- mockRecoveryManager, mockLogManager, mockVBC, mockCheckpointProvider, mockLockNotifier);
-
- // NEXT get the datasets map via reflection
- try {
- Field datasetsField = DatasetLifecycleManager.class.getDeclaredField("datasets");
- datasetsField.setAccessible(true);
- @SuppressWarnings("unchecked")
- Map<Integer, DatasetResource> datasets =
- (Map<Integer, DatasetResource>) datasetsField.get(datasetLifecycleManager);
- if (datasets != null) {
- LOGGER.info("Accessed datasets map successfully, found {} entries", datasets.size());
- // Store a direct reference to the actual map
- datasetResourceMap = datasets;
- } else {
- LOGGER.warn("Retrieved datasets map is null");
- }
- } catch (Exception e) {
- LOGGER.error("Failed to access datasets field via reflection", e);
- }
-
- // FINALLY setup mock resources (so they get the real datasets map)
- setupMockResources();
-
- executorService = Executors.newFixedThreadPool(NUM_THREADS);
- }
-
- /**
- * Helper method to set the lazy recovery flag and reconfigure the mock
- */
- public void setLazyRecoveryEnabled(boolean enabled) {
- this.lazyRecoveryEnabled = enabled;
- when(mockRecoveryManager.isLazyRecoveryEnabled()).thenReturn(enabled);
- LOGGER.info("Set lazy recovery enabled to: {}", enabled);
- }
-
- /**
- * Creates mock resources for testing.
- * For each combination of dataset and partition:
- * 1. Creates primary and secondary indexes with unique resource IDs
- * 2. Sets up the repository to return the appropriate resources
- * 3. Verifies that all resources have unique IDs
- *
- * This is crucial for testing the register/open/unregister lifecycle,
- * as it ensures each resource path maps to a unique mock index.
- */
- private void setupMockResources() throws HyracksDataException {
- LOGGER.debug("Setting up mock resources");
-
- // Setup mock resources for different datasets and partitions
- for (int i = 0; i < NUM_DATASETS; i++) {
- // Use datasetId starting from 101 to avoid reserved IDs (below 100)
- int datasetId = 101 + i;
- String datasetName = "ds" + i;
- for (int j = 0; j < NUM_PARTITIONS; j++) {
- // Create primary index
- String primaryPath = getResourcePath(j, datasetName, 0, datasetName);
- setupMockResource(primaryPath, datasetId, j, datasetId * 100 + j * 10);
-
- // Create secondary index
- String secondaryPath = getResourcePath(j, datasetName, 1, datasetName + "_idx");
- setupMockResource(secondaryPath, datasetId, j, datasetId * 100 + j * 10 + 1);
-
- createdResourcePaths.add(primaryPath);
- createdResourcePaths.add(secondaryPath);
- }
- }
-
- // Wire up the mockResourceRepository to return our mock resources
- when(mockResourceRepository.get(anyString())).thenAnswer(invocation -> {
- String path = invocation.getArgument(0);
- LocalResource resource = mockResources.get(path);
- LOGGER.debug("get({}) returning {}", path, resource);
- return resource;
- });
-
- LOGGER.debug("Created {} mock resources", mockResources.size());
- }
-
- /**
- * Sets up a mock resource for a specific resource path.
- * For each resource path, this method:
- * 1. Creates a unique MockIndex instance
- * 2. Creates a mock DatasetLocalResource that returns the MockIndex
- * 3. Creates a mock LocalResource with the unique resourceId
- * 4. Sets up the IO manager to resolve the resource path
- *
- * The key pattern here is that each resource path must map to a unique
- * MockIndex, and each resource must have a unique ID.
- */
- private void setupMockResource(String resourcePath, int datasetId, int partition, long resourceId)
- throws HyracksDataException {
-
- // Create and store a mock index, passing the datasetResourceMap
- MockIndex mockIndex = new MockIndex(resourcePath, datasetId, datasetResourceMap);
- mockIndexes.put(resourcePath, mockIndex);
-
- // Create a mock dataset local resource
- DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
- when(mockDatasetResource.getDatasetId()).thenReturn(datasetId);
- when(mockDatasetResource.getPartition()).thenReturn(partition);
-
- // Important: We need to ensure each createInstance call returns the specific MockIndex
- // for this resource path
- when(mockDatasetResource.createInstance(mockServiceContext)).thenAnswer(invocation -> {
- return mockIndexes.get(resourcePath);
- });
-
- // Create a mock local resource
- LocalResource mockLocalResource = mock(LocalResource.class);
- when(mockLocalResource.getId()).thenReturn(resourceId);
- when(mockLocalResource.getPath()).thenReturn(resourcePath);
- when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
-
- mockResources.put(resourcePath, mockLocalResource);
-
- // Create a mock file reference
- FileReference mockFileRef = mock(FileReference.class);
- when(mockFileRef.getRelativePath()).thenReturn(resourcePath);
- when(mockIOManager.resolveAbsolutePath(resourcePath)).thenReturn(mockFileRef);
- }
-
- /**
- * Generates a standardized resource path for a given partition, dataset, and index.
- * Format: storage/partition_X/DbName/ScopeName/DatasetName/ReplicationFactor/ResourceID_IndexName
- * This ensures consistent resource path formatting throughout the tests.
- *
- * Note: The replication factor is always 0 in production environments, so we match that here.
- */
- private String getResourcePath(int partition, String datasetName, int resourceId, String indexName) {
- // Format: storage/partition_X/DbName/ScopeName/DatasetName/ReplicationFactor/IndexName
- // Always use 0 for replication factor, and include resourceId as part of the index name
- return String.format("%s/partition_%d/%s/%s/%s/0/%s_%s", STORAGE_DIR, partition, DB_NAME, SCOPE_NAME,
- datasetName, resourceId, indexName);
- }
-
- /**
- * Cleans up after tests by shutting down the executor service.
- */
- @After
- public void tearDown() throws Exception {
- executorService.shutdownNow();
- executorService.awaitTermination(5, TimeUnit.SECONDS);
- datasetLifecycleManager.stop(false, null);
- }
-
- /**
- * Tests concurrent registration and opening of indexes.
- * This test verifies that:
- * 1. Multiple threads can concurrently register and open indexes without causing errors
- * 2. Re-registration of an already registered index returns the same index instance
- * 3. Indexes are properly activated when opened
- * 4. All register and open operations properly maintain index state
- */
- @Test
- public void testConcurrentRegisterIfAbsentAndOpen() throws Exception {
- LOGGER.info("Starting testConcurrentRegisterAndOpen");
-
- final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS);
- final AtomicInteger successCount = new AtomicInteger(0);
- final AtomicInteger errorCount = new AtomicInteger(0);
- final Map<String, IIndex> firstRegistrations = new ConcurrentHashMap<>();
- final List<Future<?>> futures = new ArrayList<>();
-
- // Register half of the resources upfront
- int preRegisteredCount = createdResourcePaths.size() / 2;
- for (int i = 0; i < preRegisteredCount; i++) {
- String resourcePath = createdResourcePaths.get(i);
- IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
- firstRegistrations.put(resourcePath, index);
- }
-
- // Create threads that will randomly register/re-register and open indexes
- for (int i = 0; i < NUM_THREADS; i++) {
- futures.add(executorService.submit(() -> {
- try {
- barrier.await(); // Ensure all threads start at the same time
-
- Random random = new Random();
- for (int j = 0; j < NUM_OPERATIONS_PER_THREAD; j++) {
- // Pick a random resource path
- String resourcePath = createdResourcePaths.get(random.nextInt(createdResourcePaths.size()));
-
- // Randomly choose between register or open
- if (random.nextBoolean()) {
- IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
- assertNotNull("Index should not be null after register", index);
-
- // If this path was previously registered, verify it's the same instance
- IIndex firstInstance = firstRegistrations.get(resourcePath);
- if (firstInstance != null) {
- assertSame("Re-registration should return the same index instance", firstInstance,
- index);
- } else {
- // First time registering this path, record the instance
- firstRegistrations.putIfAbsent(resourcePath, index);
- }
-
- successCount.incrementAndGet();
- } else {
- try {
- // Try to register first if not already registered
- if (!firstRegistrations.containsKey(resourcePath)) {
- IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
- firstRegistrations.putIfAbsent(resourcePath, index);
- }
-
- // Now open it
- datasetLifecycleManager.open(resourcePath);
- ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(resourcePath);
- assertNotNull("Index should not be null after open", index);
- assertTrue("Index should be activated after open", ((MockIndex) index).isActivated());
- successCount.incrementAndGet();
- } catch (Exception e) {
- throw e;
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- errorCount.incrementAndGet();
- }
- }));
- }
-
- // Wait for all threads to complete
- for (Future<?> future : futures) {
- future.get();
- }
-
- assertEquals("No unexpected errors should occur", 0, errorCount.get());
- LOGGER.info("Completed testConcurrentRegisterAndOpen: {} successful operations", successCount.get());
- assertTrue("Some operations should succeed", successCount.get() > 0);
-
- // Verify all resources are now registered
- for (String resourcePath : createdResourcePaths) {
- IIndex index = firstRegistrations.get(resourcePath);
- assertNotNull("All resources should be registered", index);
-
- // Verify one final time that re-registration returns the same instance
- IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
- assertSame("Final re-registration should return the same index instance", index, reRegisteredIndex);
- }
- }
-
- /**
- * Tests concurrent register, open, and unregister operations.
- * This test verifies that:
- * 1. Multiple threads can concurrently perform all lifecycle operations (register, open, unregister)
- * 2. Re-registration of an already registered index returns the same index instance
- * 3. Resources can be properly unregistered after being opened and closed
- * 4. Concurrent lifecycle operations do not interfere with each other
- */
- @Test
- public void testConcurrentRegisterIfAbsentOpenAndUnregister() throws Exception {
- LOGGER.info("Starting testConcurrentRegisterOpenAndUnregister with {} threads", NUM_THREADS);
-
- final Map<String, IIndex> registeredInstances = new HashMap<>();
- final AtomicInteger successCount = new AtomicInteger(0);
- final AtomicInteger expectedErrorCount = new AtomicInteger(0);
- final AtomicInteger unexpectedErrorCount = new AtomicInteger(0);
-
- List<Future<?>> futures = new ArrayList<>();
- for (int i = 0; i < NUM_THREADS; i++) {
- futures.add(executorService.submit(() -> {
- try {
- final Random random = new Random();
- for (int j = 0; j < NUM_OPERATIONS_PER_THREAD; j++) {
- // Pick a random resource path
- String resourcePath = createdResourcePaths.get(random.nextInt(createdResourcePaths.size()));
-
- try {
- // Randomly choose between register, open, or unregister
- int op = random.nextInt(3);
- if (op == 0) {
- // Register index
- LOGGER.debug("Thread {} registering resource {}", Thread.currentThread().getId(),
- resourcePath);
-
- IIndex index = datasetLifecycleManager.get(resourcePath);
- synchronized (registeredInstances) {
- registeredInstances.put(resourcePath, index);
- }
-
- LOGGER.debug("Thread {} registered resource {}", Thread.currentThread().getId(),
- resourcePath);
- successCount.incrementAndGet();
- } else if (op == 1) {
- // Open index
- LOGGER.debug("Thread {} opening resource {}", Thread.currentThread().getId(),
- resourcePath);
-
- IIndex index;
- synchronized (registeredInstances) {
- index = registeredInstances.get(resourcePath);
- }
-
- if (index != null) {
- try {
- datasetLifecycleManager.open(resourcePath);
- LOGGER.debug("Thread {} opened resource {}", Thread.currentThread().getId(),
- resourcePath);
- successCount.incrementAndGet();
- } catch (HyracksDataException e) {
- if (e.getMessage() != null
- && e.getMessage().contains("HYR0104: Index does not exist")) {
- LOGGER.debug("Thread {} failed to open unregistered resource {}: {}",
- Thread.currentThread().getId(), resourcePath, e.getMessage());
- expectedErrorCount.incrementAndGet();
- } else {
- LOGGER.error("Thread {} failed to open resource {}: {}",
- Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
- unexpectedErrorCount.incrementAndGet();
- }
- }
- }
- } else {
- // Unregister index
- LOGGER.debug("Thread {} unregistering resource {}", Thread.currentThread().getId(),
- resourcePath);
-
- try {
- datasetLifecycleManager.unregister(resourcePath);
- LOGGER.debug("Thread {} unregistered resource {}", Thread.currentThread().getId(),
- resourcePath);
- synchronized (registeredInstances) {
- registeredInstances.remove(resourcePath);
- }
- successCount.incrementAndGet();
- } catch (HyracksDataException e) {
- if (e.getMessage() != null
- && (e.getMessage().contains("HYR0104: Index does not exist")
- || e.getMessage().contains("HYR0105: Cannot drop in-use index"))) {
- LOGGER.debug("Thread {} failed to unregister resource {}: {}",
- Thread.currentThread().getId(), resourcePath, e.getMessage());
- expectedErrorCount.incrementAndGet();
- } else {
- LOGGER.error("Thread {} failed to unregister resource {}: {}",
- Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
- unexpectedErrorCount.incrementAndGet();
- }
- }
- }
- } catch (Exception e) {
- LOGGER.error("Thread {} encountered error on resource {}: {}",
- Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
- unexpectedErrorCount.incrementAndGet();
- }
- }
- } catch (Exception e) {
- LOGGER.error("Thread {} encountered unexpected error: {}", Thread.currentThread().getId(),
- e.getMessage(), e);
- unexpectedErrorCount.incrementAndGet();
- }
- }));
- }
-
- // Wait for all threads to complete
- for (Future<?> future : futures) {
- future.get();
- }
-
- // Print final stats
- int remainingRegistered = registeredInstances.size();
- LOGGER.info(
- "testConcurrentRegisterOpenAndUnregister completed - Success: {}, Expected Errors: {}, Unexpected Errors: {}, Remaining registered: {}",
- successCount.get(), expectedErrorCount.get(), unexpectedErrorCount.get(), remainingRegistered);
-
- // Check results
- assertEquals("No unexpected errors should occur", 0, unexpectedErrorCount.get());
- assertTrue("Some operations should succeed", successCount.get() > 0);
- LOGGER.info("Expected errors occurred: {} - these are normal in concurrent environment",
- expectedErrorCount.get());
- }
-
- /**
- * Tests behavior when re-registering already registered indexes.
- * This test verifies that:
- * 1. Registering an index returns a valid index instance
- * 2. Re-registering the same index returns the same instance (not a new one)
- * 3. Concurrent re-registrations and opens work correctly
- * 4. The identity of index instances is preserved throughout operations
- */
- @Test
- public void testRegisterIfAbsentAlreadyRegisteredIndex() throws Exception {
- LOGGER.info("Starting testRegisterAlreadyRegisteredIndex");
-
- // Register all resources first
- Map<String, IIndex> firstRegistrations = new HashMap<>();
- for (String resourcePath : createdResourcePaths) {
- IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
- assertNotNull("First registration should succeed", index);
- firstRegistrations.put(resourcePath, index);
-
- // Verify the returned index is the same as our mock index
- MockIndex mockIndex = mockIndexes.get(resourcePath);
- assertSame("Register should return our mock index for " + resourcePath, mockIndex, index);
- }
-
- // Try to register again - should return the same instances without re-registering
- for (String resourcePath : createdResourcePaths) {
- IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
- assertNotNull("Re-registration should succeed", reRegisteredIndex);
- // Verify we get back the same instance (no re-registration occurred)
- assertSame("Re-registration should return the same index instance", firstRegistrations.get(resourcePath),
- reRegisteredIndex);
-
- // Double check it's still our mock index
- MockIndex mockIndex = mockIndexes.get(resourcePath);
- assertSame("Re-register should still return our mock index for " + resourcePath, mockIndex,
- reRegisteredIndex);
- }
-
- // Now try concurrent open and re-register operations
- final CountDownLatch startLatch = new CountDownLatch(1);
- final CountDownLatch completionLatch = new CountDownLatch(NUM_THREADS);
- final AtomicInteger openSuccesses = new AtomicInteger(0);
- final ConcurrentHashMap<String, Integer> reRegisterCounts = new ConcurrentHashMap<>();
-
- for (int i = 0; i < NUM_THREADS; i++) {
- final int threadId = i;
- executorService.submit(() -> {
- try {
- startLatch.await();
-
- // Even threads try to open, odd threads try to re-register
- if (threadId % 2 == 0) {
- String resourcePath = createdResourcePaths.get(threadId % createdResourcePaths.size());
- datasetLifecycleManager.open(resourcePath);
- ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(resourcePath);
- assertNotNull("Index should not be null after open", index);
- assertTrue("Index should be activated after open", ((MockIndex) index).isActivated());
- openSuccesses.incrementAndGet();
- } else {
- String resourcePath = createdResourcePaths.get(threadId % createdResourcePaths.size());
- IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
- assertNotNull("Re-registration should succeed", reRegisteredIndex);
- // Keep track of re-registrations
- reRegisterCounts.compute(resourcePath, (k, v) -> v == null ? 1 : v + 1);
- // Verify it's the same instance as the original registration
- assertSame("Re-registration should return the same index instance",
- firstRegistrations.get(resourcePath), reRegisteredIndex);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- completionLatch.countDown();
- }
- });
- }
-
- // Start all threads
- startLatch.countDown();
-
- // Wait for completion
- completionLatch.await(10, TimeUnit.SECONDS);
-
- // Verify results
- assertEquals("Half of threads should succeed in opening", NUM_THREADS / 2, openSuccesses.get());
-
- // Verify that re-registration attempts occurred for each resource
- for (String path : createdResourcePaths) {
- // Some resources should have been re-registered multiple times
- Integer reRegCount = reRegisterCounts.get(path);
- if (reRegCount != null) {
- LOGGER.info("Resource path {} was re-registered {} times", path, reRegCount);
- }
- }
-
- LOGGER.info("Completed testRegisterAlreadyRegisteredIndex: {} successful opens", openSuccesses.get());
- }
-
- /**
- * Tests concurrent opening of the same index by multiple threads.
- * This test verifies that:
- * 1. Multiple threads can concurrently open the same index without errors
- * 2. The index is properly activated after being opened
- * 3. Concurrent opens do not interfere with each other
- */
- @Test
- public void testConcurrentOpenSameIndex() throws Exception {
- LOGGER.info("Starting testConcurrentOpenSameIndex");
-
- final int THREADS = 10;
- final String RESOURCE_PATH = createdResourcePaths.get(0); // First created resource path
- final CountDownLatch startLatch = new CountDownLatch(1);
- final CountDownLatch completionLatch = new CountDownLatch(THREADS);
- final AtomicInteger errors = new AtomicInteger(0);
-
- // Register the index first - REQUIRED before open
- datasetLifecycleManager.registerIfAbsent(RESOURCE_PATH, null);
-
- // Create threads that will all open the same index
- for (int i = 0; i < THREADS; i++) {
- executorService.submit(() -> {
- try {
- startLatch.await();
- datasetLifecycleManager.open(RESOURCE_PATH);
- } catch (Exception e) {
- e.printStackTrace();
- errors.incrementAndGet();
- } finally {
- completionLatch.countDown();
- }
- });
- }
-
- // Start all threads simultaneously
- startLatch.countDown();
-
- // Wait for all threads to complete
- completionLatch.await(10, TimeUnit.SECONDS);
-
- assertEquals("No errors should occur when opening the same index concurrently", 0, errors.get());
-
- // Verify the index is actually open
- ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(RESOURCE_PATH);
- assertNotNull("Index should be retrievable", index);
- assertTrue("Index should be activated", ((MockIndex) index).isActivated());
-
- LOGGER.info("Completed testConcurrentOpenSameIndex: index activated={}", ((MockIndex) index).isActivated());
- }
-
- /**
- * Tests concurrent closing of an index after it has been opened.
- * This test verifies that:
- * 1. An index can be closed after being opened
- * 2. Multiple threads can attempt to close the same index
- * 3. The index is properly deactivated after being closed
- */
- @Test
- public void testConcurrentCloseAfterOpen() throws Exception {
- LOGGER.info("Starting testConcurrentCloseAfterOpen");
-
- final String RESOURCE_PATH = createdResourcePaths.get(1); // Second created resource path
- final CountDownLatch openLatch = new CountDownLatch(1);
- final CountDownLatch closeLatch = new CountDownLatch(1);
- final AtomicBoolean openSuccess = new AtomicBoolean(false);
- final AtomicBoolean closeSuccess = new AtomicBoolean(false);
-
- // Register the index first
- datasetLifecycleManager.registerIfAbsent(RESOURCE_PATH, null);
-
- // Thread to open the index
- executorService.submit(() -> {
- try {
- datasetLifecycleManager.open(RESOURCE_PATH);
- openSuccess.set(true);
- openLatch.countDown();
-
- // Wait a bit to simulate work with the open index
- Thread.sleep(100);
- } catch (Exception e) {
- e.printStackTrace();
- fail("Open operation failed: " + e.getMessage());
- }
- });
-
- // Thread to close the index after it's opened
- executorService.submit(() -> {
- try {
- openLatch.await(); // Wait for open to complete
- datasetLifecycleManager.close(RESOURCE_PATH);
- closeSuccess.set(true);
- closeLatch.countDown();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Close operation failed: " + e.getMessage());
- }
- });
-
- // Wait for operations to complete
- closeLatch.await(5, TimeUnit.SECONDS);
-
- assertTrue("Open operation should succeed", openSuccess.get());
- assertTrue("Close operation should succeed", closeSuccess.get());
-
- LOGGER.info("Completed testConcurrentCloseAfterOpen: openSuccess={}, closeSuccess={}", openSuccess.get(),
- closeSuccess.get());
- }
-
- /**
- * Tests the identity and uniqueness of mock objects and resources.
- * This test verifies that:
- * 1. Each resource path has a unique MockIndex instance
- * 2. Each LocalResource has a unique ID
- * 3. The createInstance method returns the correct MockIndex for each resource path
- * 4. Registration returns the expected MockIndex instance
- *
- * This test is crucial for ensuring the test infrastructure itself is correctly set up.
- */
- @Test
- public void testMockIndexIdentity() throws HyracksDataException {
- LOGGER.info("Starting testMockIndexIdentity");
-
- // Verify that the mockIndexes map is correctly populated
- assertTrue("Mock indexes should be created", !mockIndexes.isEmpty());
- assertEquals("Should have created mocks for all resource paths", createdResourcePaths.size(),
- mockIndexes.size());
-
- // Verify each local resource has a unique ID
- Set<Long> resourceIds = new HashSet<>();
- for (LocalResource resource : mockResources.values()) {
- long id = resource.getId();
- if (!resourceIds.add(id)) {
- fail("Duplicate resource ID found: " + id + " for path: " + resource.getPath());
- }
- }
-
- // For each resource path, verify the mock setup
- for (String resourcePath : createdResourcePaths) {
- // Check if we have a mock index for this path
- MockIndex expectedMockIndex = mockIndexes.get(resourcePath);
- assertNotNull("Should have a mock index for " + resourcePath, expectedMockIndex);
-
- // Get the local resource
- LocalResource lr = mockResourceRepository.get(resourcePath);
- assertNotNull("Should have a local resource for " + resourcePath, lr);
-
- // Print resource ID for verification
- LOGGER.info("Resource path: {}, ID: {}", resourcePath, lr.getId());
-
- // Get the dataset resource
- DatasetLocalResource datasetResource = (DatasetLocalResource) lr.getResource();
- assertNotNull("Should have a dataset resource for " + resourcePath, datasetResource);
-
- // Call createInstance and verify we get our mock index
- IIndex createdIndex = datasetResource.createInstance(mockServiceContext);
- assertSame("createInstance should return our mock index for " + resourcePath, expectedMockIndex,
- createdIndex);
-
- // Now register and verify we get the same mock index
- IIndex registeredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
- assertSame("Registered index should be our mock index for " + resourcePath, expectedMockIndex,
- registeredIndex);
-
- // Confirm the reference equality of these objects
- LOGGER.info("Path: {}", resourcePath);
- LOGGER.info(" Expected mock: {}", System.identityHashCode(expectedMockIndex));
- LOGGER.info(" Created instance: {}", System.identityHashCode(createdIndex));
- LOGGER.info(" Registered instance: {}", System.identityHashCode(registeredIndex));
- }
-
- LOGGER.info("Completed testMockIndexIdentity: verified {} resources", createdResourcePaths.size());
- }
-
- /**
- * Tests that unregistering indexes works correctly, but only after they have been properly registered and opened.
- * This test verifies that:
- * 1. Indexes can be registered and opened successfully
- * 2. All indexes are properly activated after opening
- * 3. Indexes can be unregistered after being registered and opened
- * 4. After unregistering, the indexes are no longer retrievable
- * 5. The correct lifecycle sequence (register → open → unregister) is enforced
- */
- @Test
- public void testUnregisterAfterRegisterIfAbsentAndOpen() throws Exception {
- LOGGER.info("Starting testUnregisterAfterRegisterAndOpen");
-
- Map<String, IIndex> registeredIndexes = new HashMap<>();
- int totalIndexes = createdResourcePaths.size();
- AtomicInteger successfulUnregisters = new AtomicInteger(0);
-
- // Step 1: Register and open all indexes sequentially
- for (String resourcePath : createdResourcePaths) {
- LOGGER.debug("Registering and opening: {}", resourcePath);
-
- // Register the index
- IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
- assertNotNull("Index should be registered successfully", index);
- registeredIndexes.put(resourcePath, index);
-
- // After registration, we should have a DatasetResource
- LocalResource resource = mockResources.get(resourcePath);
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
- int datasetId = datasetLocalResource.getDatasetId();
- int partition = datasetLocalResource.getPartition();
-
- // Check if we have a DatasetResource in our map
- DatasetResource datasetResource = datasetResourceMap.get(datasetId);
- if (datasetResource != null) {
- LOGGER.debug("Found DatasetResource for dataset {}", datasetId);
-
- // See if there's an operation tracker for this partition already
- ILSMOperationTracker existingTracker = null;
- try {
- // Use reflection to get the datasetPrimaryOpTrackers map
- Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
- opTrackersField.setAccessible(true);
- Map<Integer, ILSMOperationTracker> opTrackers =
- (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
- // Check if we already have a tracker for this partition
- existingTracker = opTrackers.get(partition);
- LOGGER.debug("Existing tracker for partition {}: {}", partition, existingTracker);
- } catch (Exception e) {
- LOGGER.error("Failed to access opTrackers field via reflection", e);
- }
- } else {
- LOGGER.debug("No DatasetResource found for dataset {}", datasetId);
- }
-
- // Open the index
- datasetLifecycleManager.open(resourcePath);
-
- // Verify index is activated
- MockIndex mockIndex = (MockIndex) index;
- assertTrue("Index should be activated after opening: " + resourcePath, mockIndex.isActivated());
-
- // Verify it's retrievable
- long resourceId = resource.getId();
- IIndex retrievedIndex = datasetLifecycleManager.getIndex(datasetId, resourceId);
- assertSame("Retrieved index should be the same instance", index, retrievedIndex);
- }
-
- LOGGER.info("All {} indexes registered and opened successfully", totalIndexes);
-
- // Step 2: Close and then unregister each index
- for (String resourcePath : createdResourcePaths) {
- LOGGER.debug("Closing and unregistering: {}", resourcePath);
-
- // Verify the index is activated before closing
- IIndex index = registeredIndexes.get(resourcePath);
- MockIndex mockIndex = (MockIndex) index;
- assertTrue("Index should be activated before closing: " + resourcePath, mockIndex.isActivated());
-
- // Get resource information before closing
- LocalResource resource = mockResources.get(resourcePath);
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
- int datasetId = datasetLocalResource.getDatasetId();
- int partition = datasetLocalResource.getPartition();
-
- // Close the index first to ensure reference count goes to 0
- LOGGER.debug("Closing index: {}", resourcePath);
- datasetLifecycleManager.close(resourcePath);
-
- // Brief pause to allow any asynchronous operations to complete
- Thread.sleep(50);
-
- // Get the operation tracker and verify it has 0 active operations
- DatasetResource datasetResource = datasetResourceMap.get(datasetId);
- ILSMOperationTracker opTracker = mockIndex.getOperationTracker();
-
- LOGGER.debug("Before unregister: Resource path={}, datasetId={}, partition={}, tracker={}", resourcePath,
- datasetId, partition, opTracker);
-
- // Unregister the index after it's closed
- LOGGER.debug("Unregistering index: {}", resourcePath);
- try {
- datasetLifecycleManager.unregister(resourcePath);
- successfulUnregisters.incrementAndGet();
- LOGGER.debug("Successfully unregistered index: {}", resourcePath);
- } catch (Exception e) {
- LOGGER.error("Failed to unregister index: {}", resourcePath, e);
- fail("Failed to unregister index: " + resourcePath + ": " + e.getMessage());
- }
-
- // Verify the index is no longer retrievable
- try {
- LocalResource resourceAfterUnregister = mockResources.get(resourcePath);
- long resourceIdAfterUnregister = resourceAfterUnregister.getId();
- IIndex retrievedIndexAfterUnregister =
- datasetLifecycleManager.getIndex(datasetId, resourceIdAfterUnregister);
- Assert.assertNull("Index should not be retrievable after unregister: " + resourcePath,
- retrievedIndexAfterUnregister);
- } catch (HyracksDataException e) {
- // This is also an acceptable outcome if getIndex throws an exception for unregistered indexes
- LOGGER.debug("Expected exception when retrieving unregistered index: {}", e.getMessage());
- }
- }
-
- assertEquals("All indexes should be unregistered", totalIndexes, successfulUnregisters.get());
- LOGGER.info("Completed testUnregisterAfterRegisterAndOpen: successfully unregistered {} indexes",
- successfulUnregisters.get());
- }
-
- /**
- * Tests that only one primary operation tracker is created per partition per dataset.
- * This test verifies that:
- * 1. When multiple indexes for the same dataset and partition are registered, they share the same operation tracker
- * 2. Different partitions of the same dataset have different operation trackers
- * 3. Different datasets have completely separate operation trackers
- */
- @Test
- public void testPrimaryOperationTrackerSingularity() throws Exception {
- LOGGER.info("Starting testPrimaryOperationTrackerSingularity");
-
- // Maps to track operation trackers by dataset and partition
- Map<Integer, Map<Integer, Object>> datasetToPartitionToTracker = new HashMap<>();
- Map<String, IIndex> registeredIndexes = new HashMap<>();
-
- // Step 1: Register all indexes first
- for (String resourcePath : createdResourcePaths) {
- LOGGER.debug("Registering index: {}", resourcePath);
-
- // Register the index
- IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
- assertNotNull("Index should be registered successfully", index);
- registeredIndexes.put(resourcePath, index);
-
- // Get dataset and partition information
- LocalResource resource = mockResources.get(resourcePath);
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
- int datasetId = datasetLocalResource.getDatasetId();
- int partition = datasetLocalResource.getPartition();
-
- // Check if we have a DatasetResource in our map
- DatasetResource datasetResource = datasetResourceMap.get(datasetId);
- assertNotNull("DatasetResource should be created for dataset " + datasetId, datasetResource);
-
- // At this point, operation tracker might not exist yet since it's created lazily during open()
- try {
- // Use reflection to get the datasetPrimaryOpTrackers map
- Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
- opTrackersField.setAccessible(true);
- Map<Integer, ILSMOperationTracker> opTrackers =
- (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
- // Check if tracker already exists (might be null)
- Object existingTracker = opTrackers.get(partition);
- LOGGER.debug("Before open(): Tracker for dataset {} partition {}: {}", datasetId, partition,
- existingTracker);
- } catch (Exception e) {
- LOGGER.error("Failed to access opTrackers field via reflection", e);
- }
- }
-
- // Step 2: Open all indexes to trigger operation tracker creation
- for (String resourcePath : createdResourcePaths) {
- LOGGER.debug("Opening index: {}", resourcePath);
-
- // Open the index to trigger operation tracker creation
- datasetLifecycleManager.open(resourcePath);
-
- // Get dataset and partition information
- LocalResource resource = mockResources.get(resourcePath);
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
- int datasetId = datasetLocalResource.getDatasetId();
- int partition = datasetLocalResource.getPartition();
-
- // Now the operation tracker should exist
- DatasetResource datasetResource = datasetResourceMap.get(datasetId);
- Object opTracker = null;
- try {
- // Use reflection to get the datasetPrimaryOpTrackers map
- Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
- opTrackersField.setAccessible(true);
- Map<Integer, ILSMOperationTracker> opTrackers =
- (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
- // Get the tracker for this partition
- opTracker = opTrackers.get(partition);
- assertNotNull("Operation tracker should exist after open() for partition " + partition, opTracker);
-
- LOGGER.debug("After open(): Found tracker for dataset {} partition {}: {}", datasetId, partition,
- System.identityHashCode(opTracker));
- } catch (Exception e) {
- LOGGER.error("Failed to access opTrackers field via reflection", e);
- fail("Failed to access operation trackers: " + e.getMessage());
- }
-
- // Store the tracker in our map for later comparison
- datasetToPartitionToTracker.computeIfAbsent(datasetId, k -> new HashMap<>()).putIfAbsent(partition,
- opTracker);
- }
-
- // Step 3: Create and register secondary indexes for each dataset/partition
- List<String> secondaryPaths = new ArrayList<>();
- for (String resourcePath : createdResourcePaths) {
- // Create a "secondary index" path by appending "-secondary" to the resource path
- String secondaryPath = resourcePath + "-secondary";
- secondaryPaths.add(secondaryPath);
-
- // Create a mock resource for this secondary index
- LocalResource originalResource = mockResources.get(resourcePath);
- DatasetLocalResource originalDatasetLocalResource = (DatasetLocalResource) originalResource.getResource();
- int datasetId = originalDatasetLocalResource.getDatasetId();
- int partition = originalDatasetLocalResource.getPartition();
-
- // Set up a new mock resource with the same dataset and partition but a different path
- setupMockResource(secondaryPath, datasetId, partition, originalResource.getId() + 10000);
-
- LOGGER.debug("Registering secondary index: {}", secondaryPath);
-
- // Register the secondary index
- IIndex secondaryIndex = datasetLifecycleManager.registerIfAbsent(secondaryPath, null);
- assertNotNull("Secondary index should be registered successfully", secondaryIndex);
- registeredIndexes.put(secondaryPath, secondaryIndex);
- }
-
- // Step 4: Open the secondary indexes to trigger operation tracker reuse
- for (String secondaryPath : secondaryPaths) {
- LOGGER.debug("Opening secondary index: {}", secondaryPath);
-
- // Open the secondary index
- datasetLifecycleManager.open(secondaryPath);
-
- // Get dataset and partition information
- LocalResource resource = mockResources.get(secondaryPath);
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
- int datasetId = datasetLocalResource.getDatasetId();
- int partition = datasetLocalResource.getPartition();
-
- // Get the operation tracker after opening
- DatasetResource datasetResource = datasetResourceMap.get(datasetId);
- Object secondaryOpTracker = null;
- try {
- // Use reflection to get the datasetPrimaryOpTrackers map
- Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
- opTrackersField.setAccessible(true);
- Map<Integer, ILSMOperationTracker> opTrackers =
- (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
- // Get the tracker for this partition
- secondaryOpTracker = opTrackers.get(partition);
- assertNotNull("Operation tracker should exist for secondary index partition " + partition,
- secondaryOpTracker);
-
- LOGGER.debug("After opening secondary: Found tracker for dataset {} partition {}: {}", datasetId,
- partition, System.identityHashCode(secondaryOpTracker));
- } catch (Exception e) {
- LOGGER.error("Failed to access opTrackers field via reflection", e);
- fail("Failed to access operation trackers: " + e.getMessage());
- }
-
- // Verify this is the same tracker as the one used by the primary index
- Object originalTracker = datasetToPartitionToTracker.get(datasetId).get(partition);
- assertSame("Operation tracker should be reused for the same dataset/partition", originalTracker,
- secondaryOpTracker);
- }
-
- // Step 5: Verify that different partitions of the same dataset have different operation trackers
- for (Map.Entry<Integer, Map<Integer, Object>> datasetEntry : datasetToPartitionToTracker.entrySet()) {
- int datasetId = datasetEntry.getKey();
- Map<Integer, Object> partitionTrackers = datasetEntry.getValue();
-
- if (partitionTrackers.size() > 1) {
- LOGGER.debug("Verifying different trackers for different partitions in dataset {}", datasetId);
-
- // Collect all the tracker instances for this dataset
- Set<Object> uniqueTrackers = new HashSet<>(partitionTrackers.values());
-
- // The number of unique trackers should equal the number of partitions
- assertEquals("Each partition should have its own operation tracker", partitionTrackers.size(),
- uniqueTrackers.size());
- }
- }
-
- // Step 6: Verify that different datasets have different operation trackers for the same partition
- if (datasetToPartitionToTracker.size() > 1) {
- for (int partition = 0; partition < NUM_PARTITIONS; partition++) {
- final int partitionId = partition;
-
- // Collect all trackers for this partition across different datasets
- List<Object> trackersForPartition = datasetToPartitionToTracker.entrySet().stream()
- .filter(entry -> entry.getValue().containsKey(partitionId))
- .map(entry -> entry.getValue().get(partitionId)).collect(Collectors.toList());
-
- if (trackersForPartition.size() > 1) {
- LOGGER.debug("Verifying different trackers across datasets for partition {}", partitionId);
-
- // All trackers should be unique (no sharing between datasets)
- Set<Object> uniqueTrackers = new HashSet<>(trackersForPartition);
- assertEquals("Each dataset should have its own operation tracker for partition " + partitionId,
- trackersForPartition.size(), uniqueTrackers.size());
- }
- }
- }
-
- // Step 7: Clean up - close all indexes
- for (String path : new ArrayList<>(registeredIndexes.keySet())) {
- try {
- datasetLifecycleManager.close(path);
- } catch (Exception e) {
- LOGGER.error("Error closing index {}", path, e);
- }
- }
-
- LOGGER.info(
- "Completed testPrimaryOperationTrackerSingularity: verified unique trackers for each dataset/partition combination");
- }
-
- /**
- * Tests that resources cannot be opened after unregistering unless re-registered first.
- * This test verifies:
- * 1. Resources must be registered before they can be opened
- * 2. After unregistering, resources cannot be opened until re-registered
- * 3. Re-registration of an unregistered resource works correctly
- */
- @Test
- public void testCannotOpenUnregisteredResource() throws Exception {
- LOGGER.info("Starting testCannotOpenUnregisteredResource");
-
- String resourcePath = createdResourcePaths.get(0);
-
- // Register resource first
- IIndex index = registerIfAbsentIndex(resourcePath);
- assertNotNull("Index should be registered successfully", index);
- LOGGER.debug("Registered resource {}", resourcePath);
-
- // Unregister it
- datasetLifecycleManager.unregister(resourcePath);
- LOGGER.debug("Unregistered resource {}", resourcePath);
-
- // Now try to open it - should fail
- try {
- datasetLifecycleManager.open(resourcePath);
- fail("Should not be able to open an unregistered resource");
- } catch (HyracksDataException e) {
- LOGGER.debug("Caught expected exception: {}", e.getMessage());
- assertTrue("Exception should mention index not existing", e.getMessage().contains("does not exist"));
- }
-
- // Re-register should work
- MockIndex mockIndex = mockIndexes.get(resourcePath);
- IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, mockIndex);
- assertNotNull("Index should be re-registered successfully", reRegisteredIndex);
- LOGGER.debug("Re-registered resource {}", resourcePath);
-
- // Now open should work
- datasetLifecycleManager.open(resourcePath);
- LOGGER.debug("Successfully opened resource after re-registering");
-
- LOGGER.info("Completed testCannotOpenUnregisteredResource");
- }
-
- /**
- * Tests the full lifecycle of operation trackers to ensure they're properly created,
- * maintained throughout the index lifecycle, and cleaned up at the appropriate time.
- * This test verifies that:
- * 1. Operation trackers are created when indexes are opened for the first time
- * 2. Operation trackers remain stable during index usage
- * 3. Operation trackers are properly shared among indexes of the same dataset/partition
- * 4. Operation trackers are not prematurely nullified or cleared
- * 5. Operation trackers are correctly removed when all indexes of a dataset/partition are unregistered
- */
- @Test
- public void testOperationTrackerLifecycle() throws Exception {
- // Register primary index
- String dsName = "ds0";
- int dsId = 101; // Updated from 0 to 101 to match our new datasetId scheme
- int partition = 0;
- String primaryIndexName = dsName;
- String secondaryIndexName = dsName + "_idx";
- String primaryResourcePath = getResourcePath(partition, dsName, 0, primaryIndexName);
- String secondaryResourcePath = getResourcePath(partition, dsName, 1, secondaryIndexName);
-
- LOGGER.debug("Registering primary index at {}", primaryResourcePath);
- MockIndex mockPrimaryIndex = mockIndexes.get(primaryResourcePath);
- IIndex registeredIndex = datasetLifecycleManager.registerIfAbsent(primaryResourcePath, mockPrimaryIndex);
- assertNotNull("Index should be registered successfully", registeredIndex);
-
- // Get the dataset resource and check if it has been created
- DatasetResource datasetResource = datasetResourceMap.get(dsId);
- assertNotNull("Dataset resource should be created during registration", datasetResource);
-
- // Verify there's no operation tracker before opening
- ILSMOperationTracker trackerBeforeOpen = getOperationTracker(datasetResource, partition);
- LOGGER.debug("Operation tracker before open: {}", trackerBeforeOpen);
-
- // Open primary index which should create and register an operation tracker
- LOGGER.debug("Opening primary index at {}", primaryResourcePath);
- datasetLifecycleManager.open(primaryResourcePath);
-
- // Verify operation tracker exists after opening
- ILSMOperationTracker trackerAfterOpen = getOperationTracker(datasetResource, partition);
- LOGGER.debug("Operation tracker after primary open: {}", trackerAfterOpen);
- assertNotNull("Operation tracker should be created after opening primary index", trackerAfterOpen);
-
- // Verify the tracker is from our mock index
- MockIndex mockIndex = mockIndexes.get(primaryResourcePath);
- assertTrue("Mock index should be activated after open", mockIndex.isActivated());
-
- // Compare with the mock tracker from the index
- ILSMOperationTracker mockTracker = mockIndex.getOperationTracker();
- LOGGER.debug("Mock tracker from index: {}", mockTracker);
- assertEquals("Operation tracker should be the one from the mock index", mockTracker, trackerAfterOpen);
-
- // Open secondary index which should reuse the same operation tracker
- LOGGER.debug("Registering secondary index at {}", secondaryResourcePath);
- MockIndex mockSecondaryIndex = mockIndexes.get(secondaryResourcePath);
- datasetLifecycleManager.registerIfAbsent(secondaryResourcePath, mockSecondaryIndex);
-
- LOGGER.debug("Opening secondary index at {}", secondaryResourcePath);
- datasetLifecycleManager.open(secondaryResourcePath);
-
- // Verify operation tracker is still the same after opening secondary
- ILSMOperationTracker trackerAfterSecondaryOpen = getOperationTracker(datasetResource, partition);
- LOGGER.debug("Operation tracker after secondary open: {}", trackerAfterSecondaryOpen);
- assertNotNull("Operation tracker should still exist after opening secondary index", trackerAfterSecondaryOpen);
- assertEquals("Should still be the same operation tracker", trackerAfterOpen, trackerAfterSecondaryOpen);
-
- // Close primary index - should keep the tracker as secondary is still open
- LOGGER.debug("Closing primary index at {}", primaryResourcePath);
- datasetLifecycleManager.close(primaryResourcePath);
-
- // Verify operation tracker still exists
- ILSMOperationTracker trackerAfterPrimaryClose = getOperationTracker(datasetResource, partition);
- LOGGER.debug("Operation tracker after primary close: {}", trackerAfterPrimaryClose);
- assertNotNull("Operation tracker should still exist after closing primary index", trackerAfterPrimaryClose);
- assertEquals("Should still be the same operation tracker", trackerAfterOpen, trackerAfterPrimaryClose);
-
- // Close secondary index - should remove the tracker as all indexes are closed
- LOGGER.debug("Closing secondary index at {}", secondaryResourcePath);
- datasetLifecycleManager.close(secondaryResourcePath);
-
- // Verify operation tracker is removed
- ILSMOperationTracker trackerAfterSecondaryClose = getOperationTracker(datasetResource, partition);
- LOGGER.debug("Operation tracker after secondary close: {}", trackerAfterSecondaryClose);
-
- // Unregister indexes
- LOGGER.debug("Unregistering primary index at {}", primaryResourcePath);
- datasetLifecycleManager.unregister(primaryResourcePath);
-
- LOGGER.debug("Unregistering secondary index at {}", secondaryResourcePath);
- datasetLifecycleManager.unregister(secondaryResourcePath);
-
- // Verify dataset resource is removed
- DatasetResource datasetResourceAfterUnregister = datasetResourceMap.get(dsId);
- LOGGER.debug("Dataset resource after unregister: {}", datasetResourceAfterUnregister);
- }
-
- /**
- * Helper method to get the operation tracker for a dataset/partition.
- * Uses reflection to access the private datasetPrimaryOpTrackers map in DatasetResource.
- */
- private ILSMOperationTracker getOperationTracker(DatasetResource datasetResource, int partition) {
- try {
- // Access the datasetPrimaryOpTrackers map through reflection
- Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
- opTrackersField.setAccessible(true);
- @SuppressWarnings("unchecked")
- Map<Integer, ILSMOperationTracker> opTrackers =
- (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
- // Return the operation tracker for this partition, if it exists
- if (opTrackers != null) {
- ILSMOperationTracker tracker = opTrackers.get(partition);
- if (tracker != null) {
- // Make sure we're getting a PrimaryIndexOperationTracker
- if (!(tracker instanceof PrimaryIndexOperationTracker)) {
- LOGGER.warn("Tracker is not a PrimaryIndexOperationTracker: {}", tracker.getClass().getName());
- }
- }
- return tracker;
- }
- } catch (Exception e) {
- LOGGER.error("Failed to get operation tracker via reflection", e);
- }
- return null;
- }
-
- /**
- * Tests that operation trackers are properly attached to indexes and survive open and close operations.
- * This test verifies:
- * 1. Operation trackers are properly created and associated with indexes
- * 2. Operation trackers remain valid during the index lifecycle
- * 3. Multiple opens and closes don't invalidate the operation tracker
- */
- @Test
- public void testOperationTrackerAttachment() throws Exception {
- LOGGER.info("Starting testOperationTrackerAttachment");
-
- // Choose a specific resource path to test with
- String resourcePath = createdResourcePaths.get(0);
-
- // Get dataset and partition information for this resource
- LocalResource resource = mockResources.get(resourcePath);
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
- int datasetId = datasetLocalResource.getDatasetId();
- int partition = datasetLocalResource.getPartition();
-
- // Register the resource first using our helper
- IIndex index = registerIfAbsentIndex(resourcePath);
- assertNotNull("Index should be registered successfully", index);
- LOGGER.debug("Registered resource {}", resourcePath);
-
- // Get the dataset resource
- DatasetResource datasetResource = datasetResourceMap.get(datasetId);
- assertNotNull("DatasetResource should exist after registration", datasetResource);
-
- // Verify no operation tracker exists before opening
- ILSMOperationTracker trackerBeforeOpen = getOperationTracker(datasetResource, partition);
- LOGGER.debug("Operation tracker before open: {}", trackerBeforeOpen);
-
- // Open the index - this should create and attach an operation tracker
- datasetLifecycleManager.open(resourcePath);
- LOGGER.debug("Opened resource {}", resourcePath);
-
- // Verify the operation tracker exists and is attached to the MockIndex
- ILSMOperationTracker trackerAfterOpen = getOperationTracker(datasetResource, partition);
- assertNotNull("Operation tracker should exist after open", trackerAfterOpen);
- LOGGER.debug("Operation tracker after open: {}", trackerAfterOpen);
-
- // Get the mock index and verify its tracker matches
- MockIndex mockIndex = mockIndexes.get(resourcePath);
- assertTrue("Mock index should be activated", mockIndex.isActivated());
-
- // Verify the mock index has the same operation tracker
- ILSMOperationTracker indexTracker = mockIndex.getOperationTracker();
- assertSame("Mock index should have the same operation tracker", trackerAfterOpen, indexTracker);
-
- // Close and unregister for cleanup
- datasetLifecycleManager.close(resourcePath);
- datasetLifecycleManager.unregister(resourcePath);
-
- LOGGER.info("Completed testOperationTrackerAttachment");
- }
-
- /**
- * A mock implementation of ILSMIndex for testing
- */
- private static class MockIndex implements ILSMIndex {
- private final String resourcePath;
- private final int datasetId;
- private boolean activated = false;
- // Create a mock of the specific PrimaryIndexOperationTracker class instead of the generic interface
- private PrimaryIndexOperationTracker mockTracker = mock(PrimaryIndexOperationTracker.class);
- private final Map<Integer, DatasetResource> datasetResourceMap;
-
- public MockIndex(String resourcePath, int datasetId, Map<Integer, DatasetResource> datasetResourceMap) {
- this.resourcePath = resourcePath;
- this.datasetId = datasetId;
- this.datasetResourceMap = datasetResourceMap;
- }
-
- @Override
- public void create() throws HyracksDataException {
-
- }
-
- @Override
- public void activate() {
- LOGGER.debug("Activating {}", this);
- activated = true;
- }
-
- @Override
- public void clear() throws HyracksDataException {
-
- }
-
- @Override
- public void deactivate() throws HyracksDataException {
- LOGGER.debug("Deactivating {}", this);
- activated = false;
- }
-
- @Override
- public void destroy() throws HyracksDataException {
-
- }
-
- @Override
- public void purge() throws HyracksDataException {
-
- }
-
- @Override
- public void deactivate(boolean flushOnExit) {
- LOGGER.debug("Deactivating {} with flushOnExit={}", this, flushOnExit);
- activated = false;
- }
-
- @Override
- public ILSMIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException {
- return null;
- }
-
- @Override
- public void validate() throws HyracksDataException {
-
- }
-
- @Override
- public IBufferCache getBufferCache() {
- return null;
- }
-
- @Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
- boolean checkIfEmptyIndex, IPageWriteCallback callback) throws HyracksDataException {
- return null;
- }
-
- @Override
- public int getNumOfFilterFields() {
- return 0;
- }
-
- public boolean isActivated() {
- return activated;
- }
-
- @Override
- public String toString() {
- // Ensure this matches exactly the resourcePath used for registration
- return resourcePath;
- }
-
- @Override
- public boolean isCurrentMutableComponentEmpty() {
- return true;
- }
-
- @Override
- public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents,
- IReplicationJob.ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException {
-
- }
-
- @Override
- public boolean isMemoryComponentsAllocated() {
- return false;
- }
-
- @Override
- public void allocateMemoryComponents() throws HyracksDataException {
-
- }
-
- @Override
- public ILSMMemoryComponent getCurrentMemoryComponent() {
- return null;
- }
-
- @Override
- public int getCurrentMemoryComponentIndex() {
- return 0;
- }
-
- @Override
- public List<ILSMMemoryComponent> getMemoryComponents() {
- return List.of();
- }
-
- @Override
- public boolean isDurable() {
- return false;
- }
-
- @Override
- public void updateFilter(ILSMIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException {
-
- }
-
- @Override
- public ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException {
- return null;
- }
-
- @Override
- public int getNumberOfAllMemoryComponents() {
- return 0;
- }
-
- @Override
- public ILSMHarness getHarness() {
- return null;
- }
-
- @Override
- public String getIndexIdentifier() {
- return "";
- }
-
- @Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
- boolean checkIfEmptyIndex, Map<String, Object> parameters) throws HyracksDataException {
- return null;
- }
-
- @Override
- public void resetCurrentComponentIndex() {
-
- }
-
- @Override
- public IIndexDiskCacheManager getDiskCacheManager() {
- return null;
- }
-
- @Override
- public void scheduleCleanup(List<ILSMDiskComponent> inactiveDiskComponents) throws HyracksDataException {
-
- }
-
- @Override
- public boolean isAtomic() {
- return false;
- }
-
- @Override
- public void commit() throws HyracksDataException {
-
- }
-
- @Override
- public void abort() throws HyracksDataException {
-
- }
-
- @Override
- public ILSMMergePolicy getMergePolicy() {
- return null;
- }
-
- @Override
- public ILSMOperationTracker getOperationTracker() {
- // This is the key method that DatasetLifecycleManager calls during open()
- // to get the operation tracker and register it
-
- LOGGER.debug("getOperationTracker() called for index: {}", resourcePath);
-
- // Get dataset ID and partition from the resource path
- int partition = -1;
-
- // Extract partition from resource path (storage/partition_X/...)
- String[] parts = resourcePath.split("/");
- for (int i = 0; i < parts.length; i++) {
- if (parts[i].startsWith("partition_")) {
- try {
- partition = Integer.parseInt(parts[i].substring("partition_".length()));
- break;
- } catch (NumberFormatException e) {
- LOGGER.warn("Failed to parse partition number from {}", parts[i]);
- }
- }
- }
-
- if (partition != -1) {
- // Find the dataset resource from our map
- DatasetResource datasetResource = datasetResourceMap.get(datasetId);
- if (datasetResource != null) {
- try {
- // Access the datasetPrimaryOpTrackers map through reflection
- Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
- opTrackersField.setAccessible(true);
- @SuppressWarnings("unchecked")
- Map<Integer, ILSMOperationTracker> opTrackers =
- (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
- // Manual registration - simulate what DatasetLifecycleManager would do
- if (opTrackers != null && !opTrackers.containsKey(partition)) {
- LOGGER.debug(
- "Directly adding tracker to datasetPrimaryOpTrackers for dataset {}, partition {}: {}",
- datasetId, partition, mockTracker);
- opTrackers.put(partition, mockTracker);
- }
- } catch (Exception e) {
- LOGGER.error("Failed to access datasetPrimaryOpTrackers via reflection", e);
- }
- }
- }
-
- return mockTracker;
- }
-
- @Override
- public ILSMIOOperationCallback getIOOperationCallback() {
- return null;
- }
-
- @Override
- public List<ILSMDiskComponent> getDiskComponents() {
- return List.of();
- }
-
- @Override
- public boolean isPrimaryIndex() {
- return false;
- }
-
- @Override
- public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException {
-
- }
-
- @Override
- public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
- throws HyracksDataException {
-
- }
-
- @Override
- public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException {
-
- }
-
- @Override
- public ILSMIOOperation createFlushOperation(ILSMIndexOperationContext ctx) throws HyracksDataException {
- return null;
- }
-
- @Override
- public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
- return null;
- }
-
- @Override
- public ILSMIOOperation createMergeOperation(ILSMIndexOperationContext ctx) throws HyracksDataException {
- return null;
- }
-
- @Override
- public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
- return null;
- }
-
- @Override
- public void addDiskComponent(ILSMDiskComponent index) throws HyracksDataException {
-
- }
-
- @Override
- public void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
-
- }
-
- @Override
- public void subsumeMergedComponents(ILSMDiskComponent newComponent, List<ILSMComponent> mergedComponents)
- throws HyracksDataException {
-
- }
-
- @Override
- public void changeMutableComponent() {
-
- }
-
- @Override
- public void changeFlushStatusForCurrentMutableCompoent(boolean needsFlush) {
-
- }
-
- @Override
- public boolean hasFlushRequestForCurrentMutableComponent() {
- return false;
- }
-
- @Override
- public void getOperationalComponents(ILSMIndexOperationContext ctx) throws HyracksDataException {
-
- }
-
- @Override
- public List<ILSMDiskComponent> getInactiveDiskComponents() {
- return List.of();
- }
-
- @Override
- public void addInactiveDiskComponent(ILSMDiskComponent diskComponent) {
-
- }
-
- @Override
- public List<ILSMMemoryComponent> getInactiveMemoryComponents() {
- return List.of();
- }
-
- @Override
- public void addInactiveMemoryComponent(ILSMMemoryComponent memoryComponent) {
-
- }
-
- // Additional method implementations required by the interface
- // would be implemented here with minimal functionality
-
- /**
- * Sets the operation tracker for this mock index.
- * Added to support DatasetLifecycleManagerLazyRecoveryTest use cases.
- */
- public void setOperationTracker(PrimaryIndexOperationTracker tracker) {
- this.mockTracker = tracker;
- }
- }
-
- /**
- * Enhanced version of the MockIndex implementation that provides better
- * tracking of operation tracker state changes.
- */
- private static class EnhancedMockIndex extends MockIndex {
- private PrimaryIndexOperationTracker assignedTracker;
- private final List<String> trackerEvents = new ArrayList<>();
-
- public EnhancedMockIndex(String resourcePath, int datasetId, Map<Integer, DatasetResource> datasetResourceMap) {
- super(resourcePath, datasetId, datasetResourceMap);
- }
-
- @Override
- public ILSMOperationTracker getOperationTracker() {
- PrimaryIndexOperationTracker tracker = (PrimaryIndexOperationTracker) super.getOperationTracker();
-
- if (assignedTracker == null) {
- assignedTracker = tracker;
- trackerEvents.add("Initial tracker assignment: " + tracker);
- } else if (assignedTracker != tracker) {
- trackerEvents.add("Tracker changed from " + assignedTracker + " to " + tracker);
- assignedTracker = tracker;
- }
-
- return tracker;
- }
-
- public List<String> getTrackerEvents() {
- return trackerEvents;
- }
- }
-
- /**
- * Tests specific race conditions that could lead to null operation trackers during unregistration.
- * This test verifies:
- * 1. Concurrent registration, opening, and unregistration don't create null operation trackers
- * 2. The operation tracker remains valid throughout concurrent operations
- * 3. Operation tracker cleanup occurs only when appropriate
- */
- @Test
- public void testRaceConditionsWithOperationTracker() throws Exception {
- LOGGER.info("Starting testRaceConditionsWithOperationTracker");
-
- // Create a resource path for testing
- String resourcePath = createdResourcePaths.get(0);
-
- // Get dataset and partition information for this resource
- LocalResource resource = mockResources.get(resourcePath);
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
- int datasetId = datasetLocalResource.getDatasetId();
- int partition = datasetLocalResource.getPartition();
-
- // Setup an enhanced mock index for this test to track operation tracker events
- EnhancedMockIndex enhancedMockIndex = new EnhancedMockIndex(resourcePath, datasetId, datasetResourceMap);
- mockIndexes.put(resourcePath, enhancedMockIndex);
-
- // Register the resource to make it available
- IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, enhancedMockIndex);
- assertNotNull("Index should be registered successfully", index);
- assertSame("Should get our enhanced mock index", enhancedMockIndex, index);
-
- // Create multiple threads that will perform random operations on the same resource
- final int NUM_CONCURRENT_THREADS = 10;
- final int OPERATIONS_PER_THREAD = 20;
- final CyclicBarrier startBarrier = new CyclicBarrier(NUM_CONCURRENT_THREADS);
- final CountDownLatch completionLatch = new CountDownLatch(NUM_CONCURRENT_THREADS);
-
- for (int i = 0; i < NUM_CONCURRENT_THREADS; i++) {
- final int threadId = i;
- executorService.submit(() -> {
- try {
- startBarrier.await();
- Random random = new Random();
-
- for (int op = 0; op < OPERATIONS_PER_THREAD; op++) {
- // Perform a random operation:
- // 0 = check index existence
- // 1 = open
- // 2 = close
- // 3 = register
- // 4 = check operation tracker
- int operation = random.nextInt(5);
-
- switch (operation) {
- case 0:
- // Check if index exists
- IIndex existingIndex = datasetLifecycleManager.get(resourcePath);
- LOGGER.debug("Thread {} checked index existence: {}", threadId, existingIndex != null);
- break;
- case 1:
- // Open the index
- try {
- datasetLifecycleManager.open(resourcePath);
- LOGGER.debug("Thread {} opened index", threadId);
- } catch (Exception e) {
- // This is expected occasionally since the resource might be unregistered
- LOGGER.debug("Thread {} failed to open index: {}", threadId, e.getMessage());
- }
- break;
- case 2:
- // Close the index
- try {
- datasetLifecycleManager.close(resourcePath);
- LOGGER.debug("Thread {} closed index", threadId);
- } catch (Exception e) {
- // This is expected occasionally
- LOGGER.debug("Thread {} failed to close index: {}", threadId, e.getMessage());
- }
- break;
- case 3:
- // Register or re-register the index
- try {
- IIndex reregisteredIndex =
- datasetLifecycleManager.registerIfAbsent(resourcePath, enhancedMockIndex);
- LOGGER.debug("Thread {} registered index: {}", threadId, reregisteredIndex != null);
- } catch (Exception e) {
- LOGGER.debug("Thread {} failed to register index: {}", threadId, e.getMessage());
- }
- break;
- case 4:
- // Check operation tracker
- try {
- // Get the dataset resource
- DatasetResource datasetResource = datasetResourceMap.get(datasetId);
- if (datasetResource != null) {
- // Get and verify the operation tracker
- ILSMOperationTracker tracker = getOperationTracker(datasetResource, partition);
- if (tracker instanceof PrimaryIndexOperationTracker) {
- LOGGER.debug("Thread {} found valid PrimaryIndexOperationTracker: {}",
- threadId, tracker);
- } else if (tracker != null) {
- LOGGER.warn("Thread {} found non-PrimaryIndexOperationTracker: {}",
- threadId, tracker.getClass().getName());
- } else {
- LOGGER.debug("Thread {} found no operation tracker", threadId);
- }
- }
- } catch (Exception e) {
- LOGGER.debug("Thread {} error checking operation tracker: {}", threadId,
- e.getMessage());
- }
- break;
- }
-
- // Small delay to increase chance of race conditions
- Thread.sleep(random.nextInt(5));
- }
-
- LOGGER.debug("Thread {} completed all operations", threadId);
- completionLatch.countDown();
- } catch (Exception e) {
- LOGGER.error("Thread {} error during test: {}", threadId, e.getMessage(), e);
- completionLatch.countDown();
- }
- });
- }
-
- // Wait for all threads to complete
- boolean allCompleted = completionLatch.await(30, TimeUnit.SECONDS);
- if (!allCompleted) {
- LOGGER.warn("Not all threads completed in time");
- }
-
- // Get the tracker events to verify behavior
- List<String> trackerEvents = enhancedMockIndex.getTrackerEvents();
- LOGGER.debug("Tracker events recorded: {}", trackerEvents.size());
- for (String event : trackerEvents) {
- LOGGER.debug("Tracker event: {}", event);
- }
-
- // Verify the index is still registered at the end
- IIndex finalIndex = datasetLifecycleManager.get(resourcePath);
- if (finalIndex != null) {
- // Get the final operation tracker state if the index is still registered
- DatasetResource datasetResource = datasetResourceMap.get(datasetId);
- if (datasetResource != null) {
- ILSMOperationTracker finalTracker = getOperationTracker(datasetResource, partition);
- LOGGER.debug("Final operation tracker state: {}", finalTracker);
-
- // If there's a valid tracker, it should be a PrimaryIndexOperationTracker
- if (finalTracker != null) {
- assertTrue("Final tracker should be a PrimaryIndexOperationTracker",
- finalTracker instanceof PrimaryIndexOperationTracker);
- }
- }
- }
-
- LOGGER.info("Completed testRaceConditionsWithOperationTracker");
- }
-
- // Helper method to ensure consistent register approach
- public IIndex registerIfAbsentIndex(String resourcePath) throws HyracksDataException {
- MockIndex mockIndex = mockIndexes.get(resourcePath);
- IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, mockIndex);
- LOGGER.debug("Registered {} with mock index {}", resourcePath, mockIndex);
- return index;
- }
-
- @Test
- public void testBalancedOpenCloseUnregister() throws Exception {
- LOGGER.info("Starting testBalancedOpenCloseUnregister");
-
- // Select a resource to test with
- String resourcePath = createdResourcePaths.get(0);
-
- // Register the index
- IIndex index = registerIfAbsentIndex(resourcePath);
- assertNotNull("Index should be registered successfully", index);
- LOGGER.info("Registered index: {}", resourcePath);
-
- // Number of times to open/close
- final int numOperations = 5;
-
- // Open the index multiple times
- for (int i = 0; i < numOperations; i++) {
- datasetLifecycleManager.open(resourcePath);
- LOGGER.info("Opened index {} (#{}/{})", resourcePath, i + 1, numOperations);
- }
-
- // Close the index the same number of times
- for (int i = 0; i < numOperations; i++) {
- datasetLifecycleManager.close(resourcePath);
- LOGGER.info("Closed index {} (#{}/{})", resourcePath, i + 1, numOperations);
- }
-
- // At this point, reference count should be balanced and unregistration should succeed
- try {
- datasetLifecycleManager.unregister(resourcePath);
- LOGGER.info("Successfully unregistered index {} after balanced open/close operations", resourcePath);
-
- // After unregistration, simulate real behavior by making the repository return null for this path
- // This is what would happen in a real environment - the resource would be removed from storage
- mockResources.remove(resourcePath);
- LOGGER.info("Removed resource {} from mock resources", resourcePath);
- } catch (HyracksDataException e) {
- fail("Failed to unregister index after balanced open/close: " + e.getMessage());
- }
-
- // Verify the index is actually unregistered by attempting to get it
- IIndex retrievedIndex = datasetLifecycleManager.get(resourcePath);
- assertNull("Index should no longer be registered after unregistration", retrievedIndex);
-
- // Verify the resource is no longer in our mock repository
- LocalResource retrievedResource = mockResourceRepository.get(resourcePath);
- assertNull("Resource should be removed from repository after unregistration", retrievedResource);
-
- // Try to open the unregistered index - should fail
- boolean openFailed = false;
- try {
- datasetLifecycleManager.open(resourcePath);
- } catch (HyracksDataException e) {
- openFailed = true;
- LOGGER.info("As expected, failed to open unregistered index: {}", e.getMessage());
- assertTrue("Error should indicate the index doesn't exist",
- e.getMessage().contains("Index does not exist"));
- }
- assertTrue("Opening an unregistered index should fail", openFailed);
- }
-
- // Add these accessor methods at the end of the class
-
- /**
- * Returns the dataset lifecycle manager for testing
- */
- public DatasetLifecycleManager getDatasetLifecycleManager() {
- return datasetLifecycleManager;
- }
-
- /**
- * Returns the mock service context for testing
- */
- public INCServiceContext getMockServiceContext() {
- return mockServiceContext;
- }
-
- /**
- * Returns the list of created resource paths
- */
- public List<String> getCreatedResourcePaths() {
- return createdResourcePaths;
- }
-
- /**
- * Returns the map of mock resources
- */
- public Map<String, LocalResource> getMockResources() {
- return mockResources;
- }
-
- /**
- * Returns the mock recovery manager
- */
- public IRecoveryManager getMockRecoveryManager() {
- return mockRecoveryManager;
- }
-
- /**
- * Returns the mock index for a path
- */
- public MockIndex getMockIndex(String resourcePath) {
- return mockIndexes.get(resourcePath);
- }
-
- /**
- * Returns the mock resource repository
- */
- public ILocalResourceRepository getMockResourceRepository() {
- return mockResourceRepository;
- }
-
- /**
- * Creates a mock index for a resource path and adds it to the mockIndexes map.
- * This is used by DatasetLifecycleManagerLazyRecoveryTest to ensure consistent mock creation.
- */
- public void createMockIndexForPath(String resourcePath, int datasetId, int partition,
- PrimaryIndexOperationTracker opTracker) throws HyracksDataException {
- // Extract needed dataset ID from the path or use provided one
- MockIndex mockIndex = new MockIndex(resourcePath, datasetId, datasetResourceMap);
-
- // Set the operation tracker directly if provided
- if (opTracker != null) {
- mockIndex.setOperationTracker(opTracker);
- }
-
- // Store the mock index
- mockIndexes.put(resourcePath, mockIndex);
-
- // Create an answer that returns this mock index
- DatasetLocalResource mockDatasetResource = null;
- for (Map.Entry<String, LocalResource> entry : mockResources.entrySet()) {
- if (entry.getKey().equals(resourcePath)) {
- mockDatasetResource = (DatasetLocalResource) entry.getValue().getResource();
- break;
- }
- }
-
- if (mockDatasetResource != null) {
- when(mockDatasetResource.createInstance(any())).thenReturn(mockIndex);
- }
-
- LOGGER.debug("Created mock index for {}: {}", resourcePath, mockIndex);
- }
-}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java
deleted file mode 100644
index a4b5a55..0000000
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java
+++ /dev/null
@@ -1,1223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.context;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.dataflow.DatasetLocalResource;
-import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.common.IIndex;
-import org.apache.hyracks.storage.common.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.LocalResource;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests the behavior of the DatasetLifecycleManager with lazy recovery enabled.
- * This test class focuses on verifying that lazy recovery works correctly.
- */
-public class DatasetLifecycleManagerLazyRecoveryTest {
- private static final Logger LOGGER = LogManager.getLogger();
-
- private DatasetLifecycleManagerConcurrentTest concurrentTest;
- private List<String> recoveredResourcePaths;
- private Set<String> datasetPartitionPaths;
- private Map<String, FileReference> pathToFileRefMap;
- private ExecutorService executorService;
-
- @Before
- public void setUp() throws Exception {
- // Create and initialize the base test
- concurrentTest = new DatasetLifecycleManagerConcurrentTest();
- concurrentTest.setUp();
-
- // Enable lazy recovery
- concurrentTest.setLazyRecoveryEnabled(true);
- LOGGER.info("Lazy recovery enabled for all tests");
-
- // Track recovered resources
- recoveredResourcePaths = new ArrayList<>();
- datasetPartitionPaths = new TreeSet<>();
- pathToFileRefMap = new HashMap<>();
-
- // Extract the dataset partition paths from the resource paths and log partition info
- // These are the parent directories that contain multiple index files
- // Format: storage/partition_X/DBName/ScopeName/DatasetName/ReplicationFactor/IndexName
- // Note: The replication factor is ALWAYS 0 in production and should be 0 in all tests
- Pattern pattern = Pattern.compile("(.*?/partition_(\\d+)/[^/]+/[^/]+/[^/]+/(\\d+))/.+");
-
- // Map to collect resources by partition for logging
- Map<Integer, List<String>> resourcesByPartition = new HashMap<>();
-
- for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
- Matcher matcher = pattern.matcher(resourcePath);
- if (matcher.matches()) {
- String partitionPath = matcher.group(1); // Now includes the replication factor
- int partitionId = Integer.parseInt(matcher.group(2));
- int replicationFactor = Integer.parseInt(matcher.group(3)); // Usually 0
-
- LOGGER.info("Resource path: {}", resourcePath);
- LOGGER.info(" Partition path: {}", partitionPath);
- LOGGER.info(" Partition ID: {}", partitionId);
- LOGGER.info(" Replication factor: {}", replicationFactor);
-
- // Track resources by partition for logging
- resourcesByPartition.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(resourcePath);
-
- // Add to our partition paths for mocking
- datasetPartitionPaths.add(partitionPath);
-
- // Create a FileReference mock for this partition path
- FileReference mockPartitionRef = mock(FileReference.class);
- when(mockPartitionRef.getAbsolutePath()).thenReturn(partitionPath);
- pathToFileRefMap.put(partitionPath, mockPartitionRef);
- } else {
- LOGGER.warn("Resource path doesn't match expected pattern: {}", resourcePath);
- }
- }
-
- // Log resources by partition for debugging
- LOGGER.info("Found {} dataset partition paths", datasetPartitionPaths.size());
- for (Map.Entry<Integer, List<String>> entry : resourcesByPartition.entrySet()) {
- LOGGER.info("Partition {}: {} resources", entry.getKey(), entry.getValue().size());
- for (String path : entry.getValue()) {
- LOGGER.info(" - {}", path);
- }
- }
-
- // Mock the IOManager.resolve() method to return our mock FileReferences
- IIOManager mockIOManager = concurrentTest.getMockServiceContext().getIoManager();
- doAnswer(inv -> {
- String path = inv.getArgument(0);
- // For each possible partition path, check if this is the path being resolved
- for (String partitionPath : datasetPartitionPaths) {
- if (path.equals(partitionPath)) {
- LOGGER.info("Resolving path {} to FileReference", path);
- return pathToFileRefMap.get(partitionPath);
- }
- }
- // If no match, create a new mock FileReference
- FileReference mockFileRef = mock(FileReference.class);
- when(mockFileRef.getAbsolutePath()).thenReturn(path);
- when(mockFileRef.toString()).thenReturn(path);
- return mockFileRef;
- }).when(mockIOManager).resolve(anyString());
-
- // Create executor service for concurrent tests
- executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
- }
-
- /**
- * Tests that when an index is opened with lazy recovery enabled,
- * all resources under the same dataset partition path are recovered together.
- * This verifies the batch recovery behavior for all resources with the same prefix path.
- */
- @Test
- public void testLazyRecoveryForEntirePartition() throws Exception {
- LOGGER.info("Starting testLazyRecoveryForEntirePartition");
-
- // Get resources for one dataset partition
- String datasetPartitionPath = datasetPartitionPaths.iterator().next();
- List<String> resourcesInPartition = concurrentTest.getCreatedResourcePaths().stream()
- .filter(path -> path.startsWith(datasetPartitionPath)).collect(Collectors.toList());
-
- LOGGER.info("Testing partition path: {}", datasetPartitionPath);
- LOGGER.info("Resources in this partition: {}", resourcesInPartition);
-
- // Mock the resource repository to track calls to getResources
- ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
-
- // Setup capture of resources that get recovered - using FileReference
- doAnswer(inv -> {
- List<FileReference> refs = inv.getArgument(1);
- if (refs != null && !refs.isEmpty()) {
- FileReference fileRef = refs.get(0);
- String rootPath = fileRef.getAbsolutePath();
- LOGGER.info("Looking up resources with FileReference: {}", fileRef);
- LOGGER.info("FileReference absolute path: {}", rootPath);
-
- // The rootPath from DatasetLifecycleManager now includes the replication factor
- // e.g., storage/partition_1/Default/SDefault/ds0/0
- LOGGER.info("Using corrected rootPath for resource lookup: {}", rootPath);
-
- // Get all resources that match this root path
- Map<Long, LocalResource> matchingResources = new HashMap<>();
-
- // Convert our string-keyed map to long-keyed map as expected by the API
- // Note: We need to match exactly, not just prefix, because the rootPath now includes the replication factor
- concurrentTest.getMockResources().entrySet().stream()
- .filter(entry -> entry.getKey().equals(rootPath) || entry.getKey().startsWith(rootPath + "/"))
- .forEach(entry -> {
- LocalResource resource = entry.getValue();
- matchingResources.put(resource.getId(), resource);
- // Record which resources were recovered for our test verification
- recoveredResourcePaths.add(entry.getKey());
- LOGGER.info("Including resource for recovery: {} (ID: {})", entry.getKey(),
- resource.getId());
- });
-
- LOGGER.info("Found {} resources to recover", matchingResources.size());
-
- // Return resources mapped by ID as the real implementation would
- return matchingResources;
- }
- return Map.of();
- }).when(mockRepository).getResources(any(), anyList());
-
- // Setup capture for recoverIndexes calls
- doAnswer(inv -> {
- List<ILSMIndex> indexes = inv.getArgument(0);
- if (indexes != null && !indexes.isEmpty()) {
- LOGGER.info("Recovering {} indexes", indexes.size());
- for (ILSMIndex index : indexes) {
- LOGGER.info("Recovering index: {}", index);
- }
- }
- return null;
- }).when(concurrentTest.getMockRecoveryManager()).recoverIndexes(anyList());
-
- // Choose the first resource in the partition to register and open
- String resourcePath = resourcesInPartition.get(0);
-
- // Register the index
- IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
- assertNotNull("Index should be registered successfully", index);
- LOGGER.info("Successfully registered index at {}", resourcePath);
-
- // Open the index - should trigger lazy recovery
- concurrentTest.getDatasetLifecycleManager().open(resourcePath);
- LOGGER.info("Successfully opened index at {}", resourcePath);
-
- // Verify the recovery manager was consulted
- verify(concurrentTest.getMockRecoveryManager(), atLeastOnce()).isLazyRecoveryEnabled();
-
- // Verify that all resources in the partition were considered for recovery
- LOGGER.info("Verifying recovery for resources in partition: {}", resourcesInPartition);
- LOGGER.info("Resources marked as recovered: {}", recoveredResourcePaths);
-
- for (String path : resourcesInPartition) {
- boolean wasRecovered = isResourcePathRecovered(path);
- LOGGER.info("Resource {} - recovered: {}", path, wasRecovered);
- assertTrue("Resource should have been considered for recovery: " + path, wasRecovered);
- }
-
- // Cleanup
- concurrentTest.getDatasetLifecycleManager().close(resourcePath);
- concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
- concurrentTest.getMockResources().remove(resourcePath);
-
- // Verify cleanup was successful
- assertNull("Index should no longer be registered",
- concurrentTest.getDatasetLifecycleManager().get(resourcePath));
- }
-
- /**
- * Tests that lazy recovery is only performed the first time an index is opened
- * and is skipped on subsequent opens.
- */
- @Test
- public void testLazyRecoveryOnlyHappensOnce() throws Exception {
- LOGGER.info("Starting testLazyRecoveryOnlyHappensOnce");
-
- // Get a resource path
- String resourcePath = concurrentTest.getCreatedResourcePaths().get(0);
- LOGGER.info("Testing with resource: {}", resourcePath);
-
- // Find the dataset partition path for this resource
- String partitionPath = null;
- for (String path : datasetPartitionPaths) {
- if (resourcePath.startsWith(path)) {
- partitionPath = path;
- break;
- }
- }
- LOGGER.info("Resource belongs to partition: {}", partitionPath);
-
- // Track recovery calls
- List<String> recoveredPaths = new ArrayList<>();
-
- // Mock repository to track resource lookups during recovery
- final Map<Integer, Integer> recoveryCount = new HashMap<>();
- recoveryCount.put(0, 0); // First recovery count
- recoveryCount.put(1, 0); // Second recovery count
-
- ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
-
- // Setup capture of resources that get recovered
- doAnswer(inv -> {
- List<FileReference> refs = inv.getArgument(1);
- if (refs != null && !refs.isEmpty()) {
- FileReference fileRef = refs.get(0);
- String rootPath = fileRef.getAbsolutePath();
-
- // Get all resources that match this root path
- Map<Long, LocalResource> matchingResources = new HashMap<>();
-
- concurrentTest.getMockResources().entrySet().stream()
- .filter(entry -> entry.getKey().startsWith(rootPath)).forEach(entry -> {
- LocalResource resource = entry.getValue();
- matchingResources.put(resource.getId(), resource);
- recoveredPaths.add(entry.getKey());
- });
-
- // Increment recovery counter based on which open we're on
- if (recoveryCount.get(1) == 0) {
- recoveryCount.put(0, recoveryCount.get(0) + matchingResources.size());
- LOGGER.info("First open recovery finding {} resources", matchingResources.size());
- } else {
- recoveryCount.put(1, recoveryCount.get(1) + matchingResources.size());
- LOGGER.info("Second open recovery finding {} resources", matchingResources.size());
- }
-
- return matchingResources;
- }
- return Map.of();
- }).when(mockRepository).getResources(any(), anyList());
-
- // Mock recovery behavior to track calls
- doAnswer(inv -> {
- List<ILSMIndex> indexes = inv.getArgument(0);
- if (indexes != null && !indexes.isEmpty()) {
- for (ILSMIndex index : indexes) {
- String path = index.toString();
- LOGGER.info("Recovering index: {}", path);
- }
- }
- return null;
- }).when(concurrentTest.getMockRecoveryManager()).recoverIndexes(anyList());
-
- // Register the index
- IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
- assertNotNull("Index should be registered successfully", index);
-
- // First open - should trigger recovery
- LOGGER.info("First open of index");
- concurrentTest.getDatasetLifecycleManager().open(resourcePath);
- LOGGER.info("First open recovery found {} resources", recoveryCount.get(0));
-
- // Close the index
- concurrentTest.getDatasetLifecycleManager().close(resourcePath);
-
- // Second open - should NOT trigger recovery again
- LOGGER.info("Second open of index");
- recoveryCount.put(1, 0); // Reset second recovery counter
- concurrentTest.getDatasetLifecycleManager().open(resourcePath);
- LOGGER.info("Second open recovery found {} resources", recoveryCount.get(1));
-
- // Verify recovery was skipped on second open
- assertTrue("First open should recover resources", recoveryCount.get(0) > 0);
- assertTrue("Recovery should not happen on subsequent opens",
- recoveryCount.get(1) == 0 || recoveryCount.get(1) < recoveryCount.get(0));
-
- // Cleanup
- concurrentTest.getDatasetLifecycleManager().close(resourcePath);
- concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
- concurrentTest.getMockResources().remove(resourcePath);
- }
-
- /**
- * A comprehensive test that runs multiple operations with lazy recovery enabled.
- * This test verifies that lazy recovery works correctly across various scenarios:
- * - Opening multiple resources in the same partition
- * - Operations across multiple partitions
- * - Reference counting with multiple open/close
- * - Proper unregistration after use
- */
- @Test
- public void testComprehensiveLazyRecoveryWithMultipleResources() throws Exception {
- LOGGER.info("Starting testComprehensiveLazyRecoveryWithMultipleResources");
-
- // Create sets to track which resources have been recovered
- Set<String> recoveredResources = new TreeSet<>();
- Set<String> recoveredPartitions = new TreeSet<>();
-
- // Mock repository to track resource lookups during recovery
- ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
-
- // Setup capture of resources that get recovered
- doAnswer(inv -> {
- List<FileReference> refs = inv.getArgument(1);
- if (refs != null && !refs.isEmpty()) {
- FileReference fileRef = refs.get(0);
- String rootPath = fileRef.getAbsolutePath();
- LOGGER.info("Looking up resources with root path: {}", rootPath);
-
- // Track this partition as being recovered
- recoveredPartitions.add(rootPath);
-
- // Get all resources that match this root path
- Map<Long, LocalResource> matchingResources = new HashMap<>();
-
- concurrentTest.getMockResources().entrySet().stream()
- .filter(entry -> entry.getKey().startsWith(rootPath)).forEach(entry -> {
- LocalResource resource = entry.getValue();
- matchingResources.put(resource.getId(), resource);
- recoveredResources.add(entry.getKey());
- LOGGER.info("Including resource for recovery: {} (ID: {})", entry.getKey(),
- resource.getId());
- });
-
- return matchingResources;
- }
- return Map.of();
- }).when(mockRepository).getResources(any(), anyList());
-
- // Group resources by partition for easier testing
- Map<String, List<String>> resourcesByPartition = new HashMap<>();
-
- for (String partitionPath : datasetPartitionPaths) {
- List<String> resourcesInPartition = concurrentTest.getCreatedResourcePaths().stream()
- .filter(path -> path.startsWith(partitionPath)).collect(Collectors.toList());
-
- resourcesByPartition.put(partitionPath, resourcesInPartition);
- }
-
- LOGGER.info("Found {} partitions with resources", resourcesByPartition.size());
-
- // For each partition, perform a series of operations
- for (Map.Entry<String, List<String>> entry : resourcesByPartition.entrySet()) {
- String partitionPath = entry.getKey();
- List<String> resources = entry.getValue();
-
- LOGGER.info("Testing partition: {} with {} resources", partitionPath, resources.size());
-
- // 1. Register and open the first resource - should trigger recovery for all resources in this partition
- String firstResource = resources.get(0);
- LOGGER.info("Registering and opening first resource: {}", firstResource);
-
- IIndex firstIndex = concurrentTest.registerIfAbsentIndex(firstResource);
- assertNotNull("First index should be registered successfully", firstIndex);
-
- // Open the index - should trigger lazy recovery for all resources in this partition
- concurrentTest.getDatasetLifecycleManager().open(firstResource);
-
- // Verify partition was recovered
- assertTrue("Partition should be marked as recovered", recoveredPartitions.contains(partitionPath));
-
- // 2. Register and open a second resource if available - should NOT trigger recovery again
- if (resources.size() > 1) {
- // Clear recovery tracking before opening second resource
- int beforeSize = recoveredResources.size();
- String secondResource = resources.get(1);
-
- LOGGER.info("Registering and opening second resource: {}", secondResource);
- IIndex secondIndex = concurrentTest.registerIfAbsentIndex(secondResource);
- assertNotNull("Second index should be registered successfully", secondIndex);
-
- // Open the second index - should NOT trigger lazy recovery again for this partition
- concurrentTest.getDatasetLifecycleManager().open(secondResource);
-
- // Verify no additional recovery happened
- int afterSize = recoveredResources.size();
- LOGGER.info("Resources recovered before: {}, after: {}", beforeSize, afterSize);
- // We might see some recovery of the specific resource, but not the whole partition again
-
- // 3. Open and close the second resource multiple times to test reference counting
- for (int i = 0; i < 3; i++) {
- concurrentTest.getDatasetLifecycleManager().close(secondResource);
- concurrentTest.getDatasetLifecycleManager().open(secondResource);
- }
-
- // Close and unregister the second resource
- concurrentTest.getDatasetLifecycleManager().close(secondResource);
- concurrentTest.getDatasetLifecycleManager().unregister(secondResource);
- LOGGER.info("Successfully closed and unregistered second resource: {}", secondResource);
- }
-
- // 4. Close and unregister the first resource
- concurrentTest.getDatasetLifecycleManager().close(firstResource);
- concurrentTest.getDatasetLifecycleManager().unregister(firstResource);
- LOGGER.info("Successfully closed and unregistered first resource: {}", firstResource);
-
- // Verify both resources are truly unregistered
- for (String resource : resources.subList(0, Math.min(2, resources.size()))) {
- assertNull("Resource should be unregistered: " + resource,
- concurrentTest.getDatasetLifecycleManager().get(resource));
- }
- }
-
- // Summary of recovery
- LOGGER.info("Recovered {} partitions out of {}", recoveredPartitions.size(), datasetPartitionPaths.size());
- LOGGER.info("Recovered {} resources out of {}", recoveredResources.size(),
- concurrentTest.getCreatedResourcePaths().size());
- }
-
- /**
- * Tests that open operation succeeds after unregister + register sequence.
- * This verifies the operation tracker is properly reset when re-registering.
- */
- @Test
- public void testUnregisterRegisterIfAbsentOpenSequence() throws Exception {
- LOGGER.info("Starting testUnregisterRegisterOpenSequence");
-
- // Get a resource path
- String resourcePath = concurrentTest.getCreatedResourcePaths().get(0);
- LOGGER.info("Testing with resource: {}", resourcePath);
-
- // First lifecycle - register, open, close, unregister
- IIndex firstIndex = concurrentTest.registerIfAbsentIndex(resourcePath);
- assertNotNull("Index should be registered successfully in first lifecycle", firstIndex);
-
- concurrentTest.getDatasetLifecycleManager().open(resourcePath);
- LOGGER.info("Successfully opened index in first lifecycle");
-
- concurrentTest.getDatasetLifecycleManager().close(resourcePath);
- concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
- LOGGER.info("Successfully closed and unregistered index in first lifecycle");
-
- // Second lifecycle - register again, open, close, unregister
- IIndex secondIndex = concurrentTest.registerIfAbsentIndex(resourcePath);
- assertNotNull("Index should be registered successfully in second lifecycle", secondIndex);
-
- // This open should succeed if the operation tracker was properly reset
- try {
- concurrentTest.getDatasetLifecycleManager().open(resourcePath);
- LOGGER.info("Successfully opened index in second lifecycle");
- } catch (Exception e) {
- LOGGER.error("Failed to open index in second lifecycle", e);
- fail("Open should succeed in second lifecycle: " + e.getMessage());
- }
-
- // Cleanup
- concurrentTest.getDatasetLifecycleManager().close(resourcePath);
- concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
- LOGGER.info("Successfully completed two full lifecycles with the same resource");
- }
-
- /**
- * Tests that recovery works correctly across different partitions.
- * This test specifically verifies that partition IDs are correctly propagated
- * during the recovery process.
- */
- @Test
- public void testRecoveryAcrossPartitions() throws Exception {
- LOGGER.info("Starting testRecoveryAcrossPartitions");
-
- // Skip test if we don't have at least 2 partitions
- if (datasetPartitionPaths.size() < 2) {
- LOGGER.info("Skipping testRecoveryAcrossPartitions - need at least 2 partitions");
- return;
- }
-
- // Create maps to track recovery by partition
- Map<Integer, Boolean> partitionRecovered = new HashMap<>();
- Map<Integer, Set<String>> resourcesRecoveredByPartition = new HashMap<>();
-
- // Find resources from different partitions
- Map<Integer, String> resourcesByPartition = new HashMap<>();
- // Format: storage/partition_X/DBName/ScopeName/DatasetName/ReplicationFactor/IndexName
- Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
-
- for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
- Matcher matcher = pattern.matcher(resourcePath);
- if (matcher.matches()) {
- int partitionId = Integer.parseInt(matcher.group(1));
- resourcesByPartition.putIfAbsent(partitionId, resourcePath);
- partitionRecovered.put(partitionId, false);
- resourcesRecoveredByPartition.put(partitionId, new TreeSet<>());
- }
- }
-
- LOGGER.info("Found resources in {} different partitions: {}", resourcesByPartition.size(),
- resourcesByPartition.keySet());
-
- // Mock repository to track which partition's resources are being recovered
- ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
-
- // Setup capture of resources that get recovered
- doAnswer(inv -> {
- List<FileReference> refs = inv.getArgument(1);
- if (refs != null && !refs.isEmpty()) {
- FileReference fileRef = refs.get(0);
- String rootPath = fileRef.getAbsolutePath();
- LOGGER.info("Looking up resources with root path: {}", rootPath);
-
- // The rootPath now includes the replication factor
- LOGGER.info("Using corrected rootPath for resource lookup: {}", rootPath);
-
- // Get the partition ID from the path
- Matcher matcher = Pattern.compile(".*?/partition_(\\d+)/.+").matcher(rootPath);
- if (matcher.matches()) {
- int partitionId = Integer.parseInt(matcher.group(1));
- partitionRecovered.put(partitionId, true);
- LOGGER.info("Recovery requested for partition {}", partitionId);
- } else {
- LOGGER.warn("Could not extract partition ID from path: {}", rootPath);
- }
-
- // Get all resources that match this root path
- Map<Long, LocalResource> matchingResources = new HashMap<>();
-
- concurrentTest.getMockResources().entrySet().stream()
- .filter(entry -> entry.getKey().equals(rootPath) || entry.getKey().startsWith(rootPath + "/"))
- .forEach(entry -> {
- LocalResource resource = entry.getValue();
- matchingResources.put(resource.getId(), resource);
-
- // Extract partition ID from path
- Matcher m = Pattern.compile(".*?/partition_(\\d+)/.+").matcher(entry.getKey());
- if (m.matches()) {
- int partitionId = Integer.parseInt(m.group(1));
- resourcesRecoveredByPartition.get(partitionId).add(entry.getKey());
- LOGGER.info("Resource {} added to recovery for partition {}", entry.getKey(),
- partitionId);
- }
- });
-
- LOGGER.info("Found {} resources to recover", matchingResources.size());
- return matchingResources;
- }
- return Map.of();
- }).when(mockRepository).getResources(any(), anyList());
-
- // Open resources from each partition
- for (Map.Entry<Integer, String> entry : resourcesByPartition.entrySet()) {
- int partitionId = entry.getKey();
- String resourcePath = entry.getValue();
-
- LOGGER.info("Testing recovery for partition {} with resource {}", partitionId, resourcePath);
-
- // Register and open the resource
- IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
- assertNotNull("Index should be registered successfully", index);
-
- // Get the LocalResource and verify its partition ID
- LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
- int resourcePartitionId = datasetLocalResource.getPartition();
-
- LOGGER.info("Resource {} has partition ID {} (expected {})", resourcePath, resourcePartitionId,
- partitionId);
-
- // Open the index - should trigger lazy recovery for this partition
- concurrentTest.getDatasetLifecycleManager().open(resourcePath);
- LOGGER.info("Successfully opened index for partition {}", partitionId);
-
- // Verify this partition was recovered
- assertTrue("Partition " + partitionId + " should be marked as recovered",
- partitionRecovered.get(partitionId));
-
- // Verify resources for this partition were recovered
- Set<String> recoveredResources = resourcesRecoveredByPartition.get(partitionId);
- assertTrue("At least one resource should be recovered for partition " + partitionId,
- !recoveredResources.isEmpty());
-
- // Cleanup
- concurrentTest.getDatasetLifecycleManager().close(resourcePath);
- concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
- }
-
- // Summary of recovery by partition
- for (Map.Entry<Integer, Boolean> entry : partitionRecovered.entrySet()) {
- int partitionId = entry.getKey();
- boolean recovered = entry.getValue();
- int resourceCount = resourcesRecoveredByPartition.get(partitionId).size();
-
- LOGGER.info("Partition {}: Recovered={}, Resources recovered={}", partitionId, recovered, resourceCount);
- }
- }
-
- /**
- * Tests that the partition ID in DatasetLocalResource objects is correctly set.
- * This test specifically verifies that partition IDs match the resource path.
- */
- @Test
- public void testPartitionIdMatchesPath() throws Exception {
- LOGGER.info("Starting testPartitionIdMatchesPath");
-
- // Find resources from different partitions
- Map<Integer, Set<String>> resourcesByPartition = new HashMap<>();
- Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
-
- // First, collect all resources by partition
- for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
- Matcher matcher = pattern.matcher(resourcePath);
- if (matcher.matches()) {
- int partitionId = Integer.parseInt(matcher.group(1));
- resourcesByPartition.computeIfAbsent(partitionId, k -> new TreeSet<>()).add(resourcePath);
- }
- }
-
- boolean incorrectPartitionIdFound = false;
-
- // Examine each resource and check its partition ID
- for (Map.Entry<Integer, Set<String>> entry : resourcesByPartition.entrySet()) {
- int expectedPartitionId = entry.getKey();
- Set<String> resources = entry.getValue();
-
- LOGGER.info("Checking {} resources for partition {}", resources.size(), expectedPartitionId);
-
- for (String resourcePath : resources) {
- LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
- if (localResource != null) {
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
- int actualPartitionId = datasetLocalResource.getPartition();
-
- LOGGER.info("Resource: {}, Expected partition: {}, Actual partition: {}", resourcePath,
- expectedPartitionId, actualPartitionId);
-
- if (expectedPartitionId != actualPartitionId) {
- LOGGER.error("PARTITION ID MISMATCH for resource {}! Expected: {}, Actual: {}", resourcePath,
- expectedPartitionId, actualPartitionId);
- incorrectPartitionIdFound = true;
- }
- } else {
- LOGGER.warn("Resource not found in mockResources: {}", resourcePath);
- }
- }
- }
-
- // This assertion will fail if any partition ID is incorrect
- assertFalse("Incorrect partition IDs found in resources", incorrectPartitionIdFound);
-
- // Now test DatasetLifecycleManager.getDatasetLocalResource directly
- LOGGER.info("Testing DatasetLifecycleManager.getDatasetLocalResource directly");
-
- // Find a resource in partition 1 (if available)
- Set<String> partition1Resources = resourcesByPartition.getOrDefault(1, Collections.emptySet());
- if (!partition1Resources.isEmpty()) {
- String resourcePath = partition1Resources.iterator().next();
- LOGGER.info("Testing getDatasetLocalResource with partition 1 resource: {}", resourcePath);
-
- // Register the resource
- IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
- assertNotNull("Index should be registered successfully", index);
-
- // Directly access the DatasetLocalResource to check partition ID
- // We'll need to use reflection since getDatasetLocalResource is private
- try {
- // Get the private method
- Method getDatasetLocalResourceMethod =
- DatasetLifecycleManager.class.getDeclaredMethod("getDatasetLocalResource", String.class);
- getDatasetLocalResourceMethod.setAccessible(true);
-
- // Invoke the method
- DatasetLocalResource result = (DatasetLocalResource) getDatasetLocalResourceMethod
- .invoke(concurrentTest.getDatasetLifecycleManager(), resourcePath);
-
- assertNotNull("DatasetLocalResource should not be null", result);
- int actualPartitionId = result.getPartition();
- LOGGER.info("getDatasetLocalResource for {} returned partition ID: {}", resourcePath,
- actualPartitionId);
- assertEquals("Partition ID should be 1", 1, actualPartitionId);
- } catch (Exception e) {
- LOGGER.error("Error accessing getDatasetLocalResource method", e);
- fail("Failed to access getDatasetLocalResource method: " + e.getMessage());
- }
-
- // Clean up
- concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
- } else {
- LOGGER.info("No resources found in partition 1, skipping direct test");
- }
- }
-
- /**
- * Tests that the mock resources are correctly set up with the proper partition IDs.
- * This test directly examines the mockResources map to verify the mocks.
- */
- @Test
- public void testMockResourcesSetup() throws Exception {
- LOGGER.info("Starting testMockResourcesSetup");
-
- // Extract partition IDs from paths and compare with DatasetLocalResource partitions
- Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
- int correctResources = 0;
- int incorrectResources = 0;
-
- LOGGER.info("Total mock resources: {}", concurrentTest.getMockResources().size());
-
- // Print out all setupMockResource calls that would be needed to recreate the resources
- for (Map.Entry<String, LocalResource> entry : concurrentTest.getMockResources().entrySet()) {
- String resourcePath = entry.getKey();
- LocalResource localResource = entry.getValue();
-
- // Extract the expected partition ID from the path
- Matcher matcher = pattern.matcher(resourcePath);
- if (matcher.matches()) {
- int expectedPartitionId = Integer.parseInt(matcher.group(1));
-
- // Get the actual partition ID from the resource
- Object resourceObj = localResource.getResource();
- if (resourceObj instanceof DatasetLocalResource) {
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resourceObj;
- int actualPartitionId = datasetLocalResource.getPartition();
- int datasetId = datasetLocalResource.getDatasetId();
- long resourceId = localResource.getId();
-
- // Log resource details
- LOGGER.info(
- "Resource: {}, Expected Partition: {}, Actual Partition: {}, Dataset ID: {}, Resource ID: {}",
- resourcePath, expectedPartitionId, actualPartitionId, datasetId, resourceId);
-
- // Generate the setupMockResource call that would create this resource
- LOGGER.info("setupMockResource(\"{}\", {}, {}, {});", resourcePath, datasetId, expectedPartitionId,
- resourceId);
-
- if (expectedPartitionId == actualPartitionId) {
- correctResources++;
- } else {
- incorrectResources++;
- LOGGER.error("PARTITION MISMATCH in mock resources: {} (expected: {}, actual: {})",
- resourcePath, expectedPartitionId, actualPartitionId);
- }
- } else {
- LOGGER.warn("Resource object is not a DatasetLocalResource: {}", resourceObj);
- }
- } else {
- LOGGER.warn("Resource path doesn't match expected pattern: {}", resourcePath);
- }
- }
-
- LOGGER.info("Resources with correct partition IDs: {}", correctResources);
- LOGGER.info("Resources with incorrect partition IDs: {}", incorrectResources);
-
- // Fail the test if any resources have incorrect partition IDs
- assertEquals("All resources should have correct partition IDs", 0, incorrectResources);
-
- // Now test a direct mock creation and retrieval to see if that works correctly
- LOGGER.info("Testing direct mock creation and retrieval");
-
- // Create a test mock resource with partition 1
- String testPath = "test/partition_1/Default/SDefault/testDs/0/testIndex"; // 0 is replication factor (always 0)
- int testDatasetId = 999;
- int testPartition = 1;
- long testResourceId = 12345;
-
- // Create a mock dataset local resource
- DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
- when(mockDatasetResource.getDatasetId()).thenReturn(testDatasetId);
- when(mockDatasetResource.getPartition()).thenReturn(testPartition);
-
- // Create a mock LSMINDEX (not just IIndex) since recovery expects ILSMIndex
- ILSMIndex mockIndex = mock(ILSMIndex.class);
- when(mockIndex.toString()).thenReturn(testPath);
-
- // Add necessary behavior for LSM operations
- when(mockIndex.isDurable()).thenReturn(true);
- when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-
- // Allow this resource to be created
- when(mockDatasetResource.createInstance(any())).thenReturn(mockIndex);
-
- // Create a mock local resource
- LocalResource mockLocalResource = mock(LocalResource.class);
- when(mockLocalResource.getId()).thenReturn(testResourceId);
- when(mockLocalResource.getPath()).thenReturn(testPath);
- when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
-
- // Add to mock resources map
- concurrentTest.getMockResources().put(testPath, mockLocalResource);
-
- // Create a mock file reference
- FileReference mockFileRef = mock(FileReference.class);
- when(mockFileRef.getRelativePath()).thenReturn(testPath);
- when(mockFileRef.getAbsolutePath()).thenReturn(testPath);
- when(concurrentTest.getMockServiceContext().getIoManager().resolveAbsolutePath(testPath))
- .thenReturn(mockFileRef);
- }
-
- /**
- * Creates a mock resource for testing.
- */
- private void setupMockResourceForTest(String resourcePath, int datasetId, int partition, long resourceId)
- throws HyracksDataException {
- // Create a mock dataset local resource
- DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
- when(mockDatasetResource.getDatasetId()).thenReturn(datasetId);
- when(mockDatasetResource.getPartition()).thenReturn(partition);
-
- // Ensure we have a dataset resource
- DatasetResource datasetResource = concurrentTest.getDatasetLifecycleManager().getDatasetLifecycle(datasetId);
-
- // Create op tracker if needed
- PrimaryIndexOperationTracker opTracker = datasetResource.getOpTracker(partition);
- if (opTracker == null) {
- opTracker = mock(PrimaryIndexOperationTracker.class);
- datasetResource.setPrimaryIndexOperationTracker(partition, opTracker);
- LOGGER.info("Created operation tracker for dataset {} partition {}", datasetId, partition);
- }
-
- // Create a mock local resource
- LocalResource mockLocalResource = mock(LocalResource.class);
- when(mockLocalResource.getId()).thenReturn(resourceId);
- when(mockLocalResource.getPath()).thenReturn(resourcePath);
- when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
-
- // Add to mock resources map
- concurrentTest.getMockResources().put(resourcePath, mockLocalResource);
-
- // Create a mock file reference
- FileReference mockFileRef = mock(FileReference.class);
- when(mockFileRef.getRelativePath()).thenReturn(resourcePath);
- when(mockFileRef.getAbsolutePath()).thenReturn(resourcePath);
- when(concurrentTest.getMockServiceContext().getIoManager().resolveAbsolutePath(resourcePath))
- .thenReturn(mockFileRef);
-
- // Ensure the mock index is the one that will be returned during registration
- concurrentTest.createMockIndexForPath(resourcePath, datasetId, partition, opTracker);
- }
-
- /**
- * Tests that unregister operation succeeds during lazy recovery.
- * This test verifies that resources are correctly unregistered during lazy recovery.
- */
- @Test
- public void testConcurrentUnregisterDuringLazyRecovery() throws Exception {
- LOGGER.info("Starting testConcurrentUnregisterDuringLazyRecovery");
-
- // We need at least 6 resources for a good test
- final int REQUIRED_RESOURCES = 6;
-
- // Get multiple resources from the same dataset and partition
- List<String> resourcesInSamePartition = findResourcesInSamePartition(REQUIRED_RESOURCES);
-
- // If we don't have enough resources, create more
- if (resourcesInSamePartition.size() < REQUIRED_RESOURCES) {
- LOGGER.info("Only found {} resources, creating additional resources", resourcesInSamePartition.size());
-
- // Get dataset and partition info from the existing resources, or create new ones
- int datasetId;
- int partitionId;
- String datasetName;
-
- if (!resourcesInSamePartition.isEmpty()) {
- // Use the same dataset and partition as existing resources
- String existingResource = resourcesInSamePartition.get(0);
- LocalResource localResource = concurrentTest.getMockResources().get(existingResource);
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
- datasetId = datasetLocalResource.getDatasetId();
- partitionId = datasetLocalResource.getPartition();
-
- // Extract dataset name from the existing resource path
- Pattern datasetPattern = Pattern.compile(".*?/partition_\\d+/[^/]+/[^/]+/([^/]+)/\\d+/.*");
- Matcher matcher = datasetPattern.matcher(existingResource);
- if (matcher.matches()) {
- datasetName = matcher.group(1);
- LOGGER.info("Using dataset name from existing resource: {}", datasetName);
- } else {
- datasetName = "testDs";
- LOGGER.warn("Could not extract dataset name from {}, using default: {}", existingResource,
- datasetName);
- }
- } else {
- // Create new dataset and partition
- datasetId = 999; // Use a unique dataset ID
- partitionId = 1; // Use partition 1
- datasetName = "testDs";
- }
-
- LOGGER.info("Creating resources for dataset {} partition {}", datasetId, partitionId);
-
- // Create additional resources until we reach the required count
- for (int i = resourcesInSamePartition.size(); i < REQUIRED_RESOURCES; i++) {
- String indexName = "test_index_" + i;
- // Use the dataset name extracted from existing resources
- String resourcePath = String.format("%s/partition_%d/%s/%s/%s/0/%s", "storage", partitionId, "Default",
- "SDefault", datasetName, indexName);
-
- // Create new mock resources
- setupMockResourceForTest(resourcePath, datasetId, partitionId, datasetId * 1000L + i);
- resourcesInSamePartition.add(resourcePath);
- // LOGGER.info("Created additional resource: {}", resourcePath);
- }
- }
-
- LOGGER.info("Have {} resources for test", resourcesInSamePartition.size());
-
- // Register ALL resources
- List<String> registeredResources = new ArrayList<>();
- for (String resourcePath : resourcesInSamePartition) {
- IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
- assertNotNull("Index should be registered successfully: " + resourcePath, index);
- registeredResources.add(resourcePath);
- LOGGER.info("Registered resource: {}", resourcePath);
- }
-
- // We'll use 1/3 of resources for unregistering
- int unregisterCount = Math.max(2, resourcesInSamePartition.size() / 3);
-
- // Split resources: one for opening, several for unregistering, rest for recovery
- String resourceForOpen = registeredResources.get(0);
- List<String> resourcesToUnregister = registeredResources.subList(1, 1 + unregisterCount);
- List<String> otherResources = registeredResources.subList(1 + unregisterCount, registeredResources.size());
-
- LOGGER.info("Using {} resources: 1 for open, {} for unregistering, {} for verification",
- registeredResources.size(), resourcesToUnregister.size(), otherResources.size());
-
- // Get the dataset and partition ID for tracking
- LocalResource localResourceA = concurrentTest.getMockResources().get(resourceForOpen);
- DatasetLocalResource datasetLocalResourceA = (DatasetLocalResource) localResourceA.getResource();
- int datasetId = datasetLocalResourceA.getDatasetId();
- int partitionId = datasetLocalResourceA.getPartition();
-
- LOGGER.info("Test using dataset {} partition {}", datasetId, partitionId);
-
- // Set up mock for getResources to return actual resources during recovery
- ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
- doAnswer(inv -> {
- List<FileReference> refs = inv.getArgument(1);
- if (refs != null && !refs.isEmpty()) {
- // Create a map of resources to return
- Map<Long, LocalResource> result = new HashMap<>();
- // Add our test resources from the same partition
- for (String path : registeredResources) {
- LocalResource res = concurrentTest.getMockResources().get(path);
- if (res != null) {
- result.put(res.getId(), res);
- LOGGER.info("Adding resource to recovery batch: {}", path);
- }
- }
- return result;
- }
- return Collections.emptyMap();
- }).when(mockRepository).getResources(any(), any());
-
- // Ensure operation tracker is properly initialized for the dataset resources
- DatasetResource datasetResource = concurrentTest.getDatasetLifecycleManager().getDatasetLifecycle(datasetId);
- if (datasetResource.getOpTracker(partitionId) == null) {
- PrimaryIndexOperationTracker opTracker = mock(PrimaryIndexOperationTracker.class);
- datasetResource.setPrimaryIndexOperationTracker(partitionId, opTracker);
- LOGGER.info("Created and set operation tracker for dataset {} partition {}", datasetId, partitionId);
- }
-
- // Verify operation tracker exists before proceeding
- assertNotNull("Operation tracker should be initialized for partition " + partitionId,
- datasetResource.getOpTracker(partitionId));
-
- // Execute operations with controlled timing
- CountDownLatch recoveryStartedLatch = new CountDownLatch(1);
- CountDownLatch unregisterCompleteLatch = new CountDownLatch(1);
-
- // Mock the recovery to signal when it starts
- IRecoveryManager mockRecoveryManager = concurrentTest.getMockRecoveryManager();
- doAnswer(invocation -> {
- List<ILSMIndex> indexes = invocation.getArgument(0);
- LOGGER.info("Recovery started with {} indexes", indexes.size());
-
- // Signal that recovery has started
- recoveryStartedLatch.countDown();
-
- // Wait for unregister to complete before continuing
- LOGGER.info("Waiting for unregister to complete");
- unregisterCompleteLatch.await(10, TimeUnit.SECONDS);
- LOGGER.info("Continuing with recovery after unregister");
-
- // Mark resources as recovered
- for (ILSMIndex index : indexes) {
- String path = index.toString();
- recoveredResourcePaths.add(path);
- // Remove individual resource logs
- }
- // Add a summary after the loop
- LOGGER.info("Marked {} resources as recovered", indexes.size());
-
- return null;
- }).when(mockRecoveryManager).recoverIndexes(anyList());
-
- // Thread 1: Open resource for open (will trigger recovery for all)
- Future<?> openFuture = executorService.submit(() -> {
- try {
- LOGGER.info("Thread 1: Opening resource to trigger recovery...");
- concurrentTest.getDatasetLifecycleManager().open(resourceForOpen);
- LOGGER.info("Thread 1: Successfully opened resource");
- } catch (Exception e) {
- LOGGER.error("Thread 1: Error opening resource", e);
- }
- });
-
- // Wait for recovery to start
- LOGGER.info("Waiting for recovery to start");
- boolean recoveryStarted = recoveryStartedLatch.await(10, TimeUnit.SECONDS);
- assertTrue("Recovery should have started", recoveryStarted);
- LOGGER.info("Recovery has started");
-
- // Thread 2: Unregister multiple resources while recovery is happening
- Future<?> unregisterFuture = executorService.submit(() -> {
- try {
- LOGGER.info("Thread 2: Unregistering {} resources...", resourcesToUnregister.size());
-
- for (String resourcePath : resourcesToUnregister) {
- concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
- }
-
- // Signal that unregister is complete
- unregisterCompleteLatch.countDown();
- LOGGER.info("Thread 2: Successfully unregistered all resources");
- } catch (Exception e) {
- LOGGER.error("Thread 2: Error unregistering resources", e);
- // Ensure we don't deadlock in case of failure
- unregisterCompleteLatch.countDown();
- }
- });
-
- // Wait for unregister to complete
- unregisterFuture.get(50, TimeUnit.SECONDS);
-
- // Wait for open to complete
- openFuture.get(10, TimeUnit.SECONDS);
-
- // Verify expectations
- LOGGER.info("Checking final state");
-
- // Check which resources were recovered
- List<String> recoveredResources = new ArrayList<>();
- List<String> nonRecoveredResources = new ArrayList<>();
-
- for (String resourcePath : registeredResources) {
- if (isResourcePathRecovered(resourcePath)) {
- recoveredResources.add(resourcePath);
- } else {
- nonRecoveredResources.add(resourcePath);
- }
- }
-
- LOGGER.info("Results: {} resources recovered, {} resources not recovered", recoveredResources.size(),
- nonRecoveredResources.size());
-
- // Resource for open should have been opened successfully
- assertTrue("Resource for open should be recovered", isResourcePathRecovered(resourceForOpen));
-
- // Resources to unregister should be unregistered
- for (String resourcePath : resourcesToUnregister) {
- IIndex indexAfter = concurrentTest.getDatasetLifecycleManager().get(resourcePath);
- assertNull("Resource should be unregistered: " + resourcePath, indexAfter);
- }
-
- // Other resources should either be recovered or not, depending on timing
- LOGGER.info("Completed testConcurrentUnregisterDuringLazyRecovery");
- }
-
- /**
- * Helper method to find resources that belong to the same dataset and partition.
- *
- * @param minCount Minimum number of resources needed
- * @return List of resource paths from the same dataset and partition
- */
- private List<String> findResourcesInSamePartition(int minCount) {
- // Group resources by dataset ID and partition ID
- Map<String, List<String>> resourcesByDatasetAndPartition = new HashMap<>();
-
- LOGGER.info("Looking for at least {} resources in the same partition", minCount);
-
- for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
- LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
- if (localResource != null) {
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
- int datasetId = datasetLocalResource.getDatasetId();
- int partitionId = datasetLocalResource.getPartition();
-
- // Create a composite key using datasetId and partitionId
- String key = datasetId + "_" + partitionId;
- resourcesByDatasetAndPartition.computeIfAbsent(key, k -> new ArrayList<>()).add(resourcePath);
- }
- }
-
- // Just log summary, not details
- LOGGER.info("Found {} groups of resources by dataset/partition", resourcesByDatasetAndPartition.size());
-
- // Find a group with enough resources
- for (List<String> resources : resourcesByDatasetAndPartition.values()) {
- if (resources.size() >= minCount) {
- String firstResource = resources.get(0);
- LocalResource localResource = concurrentTest.getMockResources().get(firstResource);
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
-
- LOGGER.info("Found suitable group with {} resources (dataset={}, partition={})", resources.size(),
- datasetLocalResource.getDatasetId(), datasetLocalResource.getPartition());
- return resources;
- }
- }
-
- // If no group has enough resources, return the one with the most
- List<String> bestGroup = resourcesByDatasetAndPartition.values().stream()
- .max(Comparator.comparingInt(List::size)).orElse(Collections.emptyList());
-
- if (!bestGroup.isEmpty()) {
- String firstResource = bestGroup.get(0);
- LocalResource localResource = concurrentTest.getMockResources().get(firstResource);
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
-
- LOGGER.info("Using best available group: {} resources (dataset={}, partition={})", bestGroup.size(),
- datasetLocalResource.getDatasetId(), datasetLocalResource.getPartition());
- } else {
- LOGGER.warn("Could not find any resources in the same partition");
- }
-
- return bestGroup;
- }
-
- /**
- * Helper method to check if a resource path is contained in the recovered paths.
- * This handles potential differences between the resource path string and the index.toString() format.
- */
- private boolean isResourcePathRecovered(String resourcePath) {
- // Direct match check
- if (recoveredResourcePaths.contains(resourcePath)) {
- return true;
- }
-
- // For each recovered path, check if it contains the resource path
- for (String recoveredPath : recoveredResourcePaths) {
- if (recoveredPath.equals(resourcePath) ||
- // Check if the recoveredPath contains the resourcePath (for formatted toString values)
- recoveredPath.contains(resourcePath)) {
- return true;
- }
- }
- return false;
- }
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 320d089..cb4e068 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -138,13 +138,9 @@
@Override
public LocalResource get(String relativePath) throws HyracksDataException {
- LocalResource resource = getLocalResourceFromCache(relativePath);
- if (resource != null) {
- return resource;
- }
beforeReadAccess();
try {
- resource = resourceCache.getIfPresent(relativePath);
+ LocalResource resource = resourceCache.getIfPresent(relativePath);
if (resource == null) {
FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
resource = readLocalResource(resourceFile);
@@ -158,20 +154,6 @@
}
}
- private LocalResource getLocalResourceFromCache(String relativePath) {
- LocalResource resource;
- beforeWriteAccess();
- try {
- resource = resourceCache.getIfPresent(relativePath);
- if (resource != null) {
- return resource;
- }
- } finally {
- afterWriteAccess();
- }
- return null;
- }
-
@SuppressWarnings("squid:S1181")
@Override
public void insert(LocalResource resource) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index 3f87c6d..f5edc74 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -68,7 +68,7 @@
ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
localResourceRepository = localResourceRepositoryFactory.createRepository();
resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
- lcManager = new IndexLifecycleManager(appCtx, localResourceRepository);
+ lcManager = new IndexLifecycleManager();
}
public void close() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index b03f54f..8a67c71 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -109,11 +109,6 @@
} catch (IOException e) {
throw HyracksDataException.create(e);
}
- IIndex registeredIndex = lcManager.registerIfAbsent(resourceRelPath, index);
- if (registeredIndex != index) {
- // some other thread has registered the index
- // indicating this is not the first time, the index is being created
- throw new HyracksDataException("Index with resource ID " + resourceId + " already exists.");
- }
+ lcManager.register(resourceRelPath, index);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 550ad24..b79c3b1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -20,25 +20,32 @@
package org.apache.hyracks.storage.am.common.dataflow;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.IResource;
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.LocalResource;
-import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
-@NotThreadSafe
public class IndexDataflowHelper implements IIndexDataflowHelper {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final INCServiceContext ctx;
private final IResourceLifecycleManager<IIndex> lcManager;
private final ILocalResourceRepository localResourceRepository;
private final FileReference resourceRef;
private IIndex index;
- public IndexDataflowHelper(final INCServiceContext ctx, IStorageManager storageMgr, FileReference resourceRef) {
+ public IndexDataflowHelper(final INCServiceContext ctx, IStorageManager storageMgr, FileReference resourceRef)
+ throws HyracksDataException {
+ this.ctx = ctx;
this.lcManager = storageMgr.getLifecycleManager(ctx);
this.localResourceRepository = storageMgr.getLocalResourceRepository(ctx);
this.resourceRef = resourceRef;
@@ -51,18 +58,56 @@
@Override
public void open() throws HyracksDataException {
- index = lcManager.registerIfAbsent(resourceRef.getRelativePath(), index);
- lcManager.open(resourceRef.getRelativePath());
+ //Get local resource file
+ synchronized (lcManager) {
+ index = lcManager.get(resourceRef.getRelativePath());
+ if (index == null) {
+ LocalResource lr = readIndex();
+ lcManager.register(lr.getPath(), index);
+ }
+ lcManager.open(resourceRef.getRelativePath());
+ }
+ }
+
+ private LocalResource readIndex() throws HyracksDataException {
+ // Get local resource
+ LocalResource lr = getResource();
+ if (lr == null) {
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourceRef.getRelativePath());
+ }
+ IResource resource = lr.getResource();
+ index = resource.createInstance(ctx);
+ return lr;
}
@Override
public void close() throws HyracksDataException {
- lcManager.close(resourceRef.getRelativePath());
+ synchronized (lcManager) {
+ lcManager.close(resourceRef.getRelativePath());
+ }
}
@Override
public void destroy() throws HyracksDataException {
- lcManager.destroy(resourceRef.getRelativePath());
+ LOGGER.log(Level.INFO, "Dropping index " + resourceRef.getRelativePath() + " on node " + ctx.getNodeId());
+ synchronized (lcManager) {
+ index = lcManager.get(resourceRef.getRelativePath());
+ if (index != null) {
+ lcManager.unregister(resourceRef.getRelativePath());
+ } else {
+ readIndex();
+ }
+
+ if (getResourceId() != -1) {
+ localResourceRepository.delete(resourceRef.getRelativePath());
+ }
+ index.destroy();
+ }
+ }
+
+ private long getResourceId() throws HyracksDataException {
+ LocalResource lr = localResourceRepository.get(resourceRef.getRelativePath());
+ return lr == null ? -1 : lr.getId();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index dd80bcb..ab301a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -25,33 +25,24 @@
import java.util.List;
import java.util.Map;
-import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.common.IIndex;
-import org.apache.hyracks.storage.common.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.IResource;
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
-import org.apache.hyracks.storage.common.LocalResource;
public class IndexLifecycleManager implements IResourceLifecycleManager<IIndex>, ILifeCycleComponent {
private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100 megabytes
private final Map<String, IndexInfo> indexInfos;
private final long memoryBudget;
- private final INCServiceContext appCtx;
- private final ILocalResourceRepository resourceRepository;
private long memoryUsed;
- public IndexLifecycleManager(INCServiceContext appCtx, ILocalResourceRepository resourceRepository) {
- this(appCtx, resourceRepository, DEFAULT_MEMORY_BUDGET);
+ public IndexLifecycleManager() {
+ this(DEFAULT_MEMORY_BUDGET);
}
- public IndexLifecycleManager(INCServiceContext appCtx, ILocalResourceRepository resourceRepository,
- long memoryBudget) {
- this.appCtx = appCtx;
- this.resourceRepository = resourceRepository;
+ public IndexLifecycleManager(long memoryBudget) {
this.indexInfos = new HashMap<>();
this.memoryBudget = memoryBudget;
this.memoryUsed = 0;
@@ -168,34 +159,11 @@
}
@Override
- public IIndex registerIfAbsent(String resourcePath, IIndex index) throws HyracksDataException {
+ public void register(String resourcePath, IIndex index) throws HyracksDataException {
if (indexInfos.containsKey(resourcePath)) {
- return indexInfos.get(resourcePath).index;
- }
- if (index == null) {
- index = getOrCreate(resourcePath);
+ throw new HyracksDataException("Index with resource name " + resourcePath + " already exists.");
}
indexInfos.put(resourcePath, new IndexInfo(index));
- return index;
- }
-
- public IIndex getOrCreate(String resourcePath) throws HyracksDataException {
- IIndex index = get(resourcePath);
- if (index == null) {
- index = readIndex(resourcePath);
- registerIfAbsent(resourcePath, index);
- }
- return index;
- }
-
- private IIndex readIndex(String resourcePath) throws HyracksDataException {
- // Get local resource
- LocalResource lr = resourceRepository.get(resourcePath);
- if (lr == null) {
- throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
- }
- IResource resource = lr.getResource();
- return resource.createInstance(appCtx);
}
@Override
@@ -241,24 +209,4 @@
}
indexInfos.remove(resourcePath);
}
-
- @Override
- public void destroy(String resourcePath) throws HyracksDataException {
- IIndex index = get(resourcePath);
- if (index != null) {
- unregister(resourcePath);
- } else {
- readIndex(resourcePath);
- }
-
- if (getResourceId(resourcePath) != -1) {
- resourceRepository.delete(resourcePath);
- }
- index.destroy();
- }
-
- private long getResourceId(String resourcePath) throws HyracksDataException {
- LocalResource lr = resourceRepository.get(resourcePath);
- return lr == null ? -1 : lr.getId();
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
index 1d74379..6200680 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
@@ -41,8 +41,10 @@
*
* @param resourceId
* @param resource
+ * @throws HyracksDataException
+ * if a resource is already registered with this resourceId
*/
- public R registerIfAbsent(String resourceId, R resource) throws HyracksDataException;
+ public void register(String resourceId, R resource) throws HyracksDataException;
/**
* Opens a resource. The resource is moved to the open state
@@ -73,15 +75,8 @@
/**
* unregister a resource removing its resources in memory and on disk
*
- * @param resourcePath
+ * @param resourceId
* @throws HyracksDataException
*/
- public void unregister(String resourcePath) throws HyracksDataException;
-
- /**
- * delete the resource
- * @param resourcePath
- * @throws HyracksDataException
- */
- public void destroy(String resourcePath) throws HyracksDataException;
+ public void unregister(String resourceId) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
index 1328bc4..0b4d2ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
@@ -59,7 +59,7 @@
@Override
public IResourceLifecycleManager<IIndex> getLifecycleManager(INCServiceContext ctx) {
- return TestStorageManagerComponentHolder.getIndexLifecycleManager(ctx);
+ return TestStorageManagerComponentHolder.getIndexLifecycleManager();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 1f7c0ab..7f696b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -78,9 +78,9 @@
lcManager = null;
}
- public synchronized static IResourceLifecycleManager<IIndex> getIndexLifecycleManager(INCServiceContext ctx) {
+ public synchronized static IResourceLifecycleManager<IIndex> getIndexLifecycleManager() {
if (lcManager == null) {
- lcManager = new IndexLifecycleManager(ctx, getLocalResourceRepository());
+ lcManager = new IndexLifecycleManager();
}
return lcManager;
}