[ASTERIXDB-3563][STO] Delay activation of dataset until accessed
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
activate the dataset when its being accessed in cloud mode.
Ext-ref: MB-63037
Change-Id: If64a7fbd9701772ffa42761d1557223a3eea95be
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19443
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ritik Raj <ritik.raj@couchbase.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ritik Raj <ritik.raj@couchbase.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index de4a066..007826b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -286,8 +286,9 @@
// Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after
// the metadata bootstrap task
((ILifeCycleComponent) virtualBufferCache).start();
- datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
- txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, lockNotifier);
+ datasetLifecycleManager =
+ new DatasetLifecycleManager(ncServiceContext, storageProperties, localResourceRepository, recoveryMgr,
+ txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, lockNotifier);
localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager);
final String nodeId = getServiceContext().getNodeId();
final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 179b996..269f6f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -901,6 +901,16 @@
return maxDiskLastLsn;
}
+ @Override
+ public boolean isLazyRecoveryEnabled() {
+ return false;
+ }
+
+ @Override
+ public void recoverIndexes(List<ILSMIndex> datasetPartitionIndexes) throws HyracksDataException {
+ // do-nothing
+ }
+
private class JobEntityCommits {
private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
private final long txnId;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 43b5d1b..85916b0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -36,6 +36,7 @@
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
@@ -43,12 +44,16 @@
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.storage.StorageIOStats;
import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResource;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -67,22 +72,26 @@
public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
private static final Logger LOGGER = LogManager.getLogger();
- private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>();
+ protected final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>();
private final StorageProperties storageProperties;
- private final ILocalResourceRepository resourceRepository;
+ protected final ILocalResourceRepository resourceRepository;
private final IVirtualBufferCache vbc;
+ protected final INCServiceContext serviceCtx;
+ protected final IRecoveryManager recoveryMgr;
private final ILogManager logManager;
private final LogRecord waitLog;
- private final IDiskResourceCacheLockNotifier lockNotifier;
+ protected final IDiskResourceCacheLockNotifier lockNotifier;
private volatile boolean stopped = false;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
// all LSM-trees share the same virtual buffer cache list
private final List<IVirtualBufferCache> vbcs;
- public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository,
- ILogManager logManager, IVirtualBufferCache vbc,
- IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
+ public DatasetLifecycleManager(INCServiceContext serviceCtx, StorageProperties storageProperties,
+ ILocalResourceRepository resourceRepository, IRecoveryManager recoveryMgr, ILogManager logManager,
+ IVirtualBufferCache vbc, IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
IDiskResourceCacheLockNotifier lockNotifier) {
+ this.serviceCtx = serviceCtx;
+ this.recoveryMgr = recoveryMgr;
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
@@ -130,7 +139,7 @@
datasetResource.register(resource, (ILSMIndex) index);
}
- private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
+ protected int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
LocalResource lr = resourceRepository.get(resourcePath);
if (lr == null) {
return -1;
@@ -138,7 +147,7 @@
return ((DatasetLocalResource) lr.getResource()).getDatasetId();
}
- private long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
+ protected long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
LocalResource lr = resourceRepository.get(resourcePath);
if (lr == null) {
return -1;
@@ -146,6 +155,14 @@
return lr.getId();
}
+ private DatasetLocalResource getDatasetLocalResource(String resourcePath) throws HyracksDataException {
+ LocalResource lr = resourceRepository.get(resourcePath);
+ if (lr == null) {
+ return null;
+ }
+ return (DatasetLocalResource) lr.getResource();
+ }
+
@Override
public synchronized void unregister(String resourcePath) throws HyracksDataException {
validateDatasetLifecycleManagerState();
@@ -193,6 +210,72 @@
@Override
public synchronized void open(String resourcePath) throws HyracksDataException {
validateDatasetLifecycleManagerState();
+ DatasetLocalResource localResource = getDatasetLocalResource(resourcePath);
+ if (localResource == null) {
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+ }
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+ lockNotifier.onOpen(resourceID);
+ try {
+ DatasetResource datasetResource = datasets.get(did);
+ int partition = localResource.getPartition();
+ if (shouldRecoverLazily(datasetResource, partition)) {
+ performLocalRecovery(resourcePath, datasetResource, partition);
+ } else {
+ openResource(resourcePath, false);
+ }
+ } finally {
+ lockNotifier.onClose(resourceID);
+ }
+ }
+
+ private void performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition)
+ throws HyracksDataException {
+ LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(),
+ partition);
+ FileReference indexRootRef = StoragePathUtil.getIndexRootPath(serviceCtx.getIoManager(), resourcePath);
+ Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef));
+
+ List<ILSMIndex> indexes = new ArrayList<>();
+ for (LocalResource resource : resources.values()) {
+ if (shouldSkipResource(resource)) {
+ continue;
+ }
+
+ ILSMIndex index = getOrCreateIndex(resource);
+ boolean undoTouch = !resourcePath.equals(resource.getPath());
+ openResource(resource.getPath(), undoTouch);
+ indexes.add(index);
+ }
+
+ if (!indexes.isEmpty()) {
+ recoveryMgr.recoverIndexes(indexes);
+ }
+
+ datasetResource.markRecovered(partition);
+ }
+
+ private boolean shouldSkipResource(LocalResource resource) {
+ DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
+ return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
+ || (lr.getResource() instanceof LSMBTreeLocalResource
+ && ((LSMBTreeLocalResource) lr.getResource()).isSecondaryNoIncrementalMaintenance());
+ }
+
+ private ILSMIndex getOrCreateIndex(LocalResource resource) throws HyracksDataException {
+ ILSMIndex index = get(resource.getPath());
+ if (index == null) {
+ DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
+ index = (ILSMIndex) lr.createInstance(serviceCtx);
+ register(resource.getPath(), index);
+ }
+ return index;
+ }
+
+ private void openResource(String resourcePath, boolean undoTouch) throws HyracksDataException {
+ validateDatasetLifecycleManagerState();
int did = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -214,15 +297,36 @@
dsr.open(true);
dsr.touch();
-
- if (!iInfo.isOpen()) {
- ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
- synchronized (opTracker) {
- iInfo.getIndex().activate();
+ boolean indexTouched = false;
+ try {
+ if (!iInfo.isOpen()) {
+ ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
+ synchronized (opTracker) {
+ iInfo.getIndex().activate();
+ }
+ iInfo.setOpen(true);
}
- iInfo.setOpen(true);
+ iInfo.touch();
+ indexTouched = true;
+ } finally {
+ if (undoTouch) {
+ dsr.untouch();
+ if (indexTouched) {
+ iInfo.untouch();
+ }
+ lockNotifier.onClose(resourceID);
+ }
}
- iInfo.touch();
+ }
+
+ private boolean shouldRecoverLazily(DatasetResource resource, int partition) {
+ // Perform lazy recovery only if the following conditions are met:
+ // 1. Lazy recovery is enabled.
+ // 2. The resource does not belong to the Metadata dataverse.
+ // 3. The partition is being accessed for the first time.
+ return recoveryMgr.isLazyRecoveryEnabled()
+ && !MetadataIndexImmutableProperties.isMetadataDataset(resource.getDatasetID())
+ && !resource.isRecovered(partition);
}
public DatasetResource getDatasetLifecycle(int did) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index db9eabb..8e3081d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -20,7 +20,9 @@
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
@@ -48,12 +50,14 @@
private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers;
private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators;
private final Map<Integer, IRateLimiter> datasetRateLimiters;
+ private final Set<Integer> recoveredPartitions;
public DatasetResource(DatasetInfo datasetInfo) {
this.datasetInfo = datasetInfo;
this.datasetPrimaryOpTrackers = new HashMap<>();
this.datasetComponentIdGenerators = new HashMap<>();
this.datasetRateLimiters = new HashMap<>();
+ this.recoveredPartitions = new HashSet<>();
}
public boolean isRegistered() {
@@ -127,6 +131,10 @@
return datasetComponentIdGenerators.get(partition);
}
+ public boolean isRecovered(int partitionId) {
+ return recoveredPartitions.contains(partitionId);
+ }
+
public IRateLimiter getRateLimiter(int partition) {
return datasetRateLimiters.get(partition);
}
@@ -139,6 +147,14 @@
datasetPrimaryOpTrackers.put(partition, opTracker);
}
+ public void markRecovered(int partition) {
+ if (recoveredPartitions.contains(partition)) {
+ throw new IllegalStateException(
+ "Index has already been recovered for dataset" + getDatasetID() + "partition " + partition);
+ }
+ recoveredPartitions.add(partition);
+ }
+
public void setIdGenerator(int partition, ILSMComponentIdGenerator idGenerator) {
if (datasetComponentIdGenerators.containsKey(partition)) {
throw new IllegalStateException("LSMComponentIdGenerator has already been set for partition " + partition);
@@ -187,5 +203,6 @@
datasetPrimaryOpTrackers.remove(partitionId);
datasetComponentIdGenerators.remove(partitionId);
datasetRateLimiters.remove(partitionId);
+ recoveredPartitions.remove(partitionId);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index a5f79ac..3437d42 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -20,10 +20,12 @@
import java.io.File;
import java.io.IOException;
+import java.util.List;
import java.util.Set;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
/**
* Provides API for failure recovery. Failure could be at application level and
@@ -128,4 +130,19 @@
*/
void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException;
+ /**
+ * Ensures that {@code datasetPartitionIndexes} are consistent by performing component id level recovery
+ *
+ * @param datasetPartitionIndexes A list of indexes associated with a specific
+ * dataset partition that require recovery.
+ * @throws HyracksDataException If an error occurs during the recovery or rollback
+ * process, indicating a failure to achieve consistency.
+ */
+ void recoverIndexes(List<ILSMIndex> datasetPartitionIndexes) throws HyracksDataException;
+
+ /**
+ * determines if the indexes need to be recovered lazily at the time of their first access
+ * @return
+ */
+ boolean isLazyRecoveryEnabled();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 28fd27e..2fdfba4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -139,6 +139,13 @@
return ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString();
}
+ public static FileReference getIndexRootPath(IIOManager ioManager, String relativePath)
+ throws HyracksDataException {
+ int separatorIndex = relativePath.lastIndexOf(File.separatorChar);
+ String parentDirectory = relativePath.substring(0, separatorIndex);
+ return ioManager.resolve(parentDirectory);
+ }
+
/**
* Create a file
* Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when
@@ -229,7 +236,17 @@
}
public static boolean isRelativeParent(FileReference parent, FileReference child) {
- return child.getRelativePath().startsWith(parent.getRelativePath());
+ String childPath = child.getRelativePath();
+ String parentPath = parent.getRelativePath();
+ boolean isMatch = childPath.startsWith(parentPath);
+ if (isMatch) {
+ int parentPathLength = parentPath.length();
+ if (childPath.length() == parentPathLength) {
+ return true;
+ }
+ return childPath.charAt(parentPathLength) == File.separatorChar;
+ }
+ return false;
}
public static String getNamespacePath(INamespacePathResolver nsPathResolver, Namespace namespace, int partition) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 5c7d5ac..cb4e068 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -251,6 +251,7 @@
return ioManager.resolve(fileName);
}
+ @Override
public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> roots)
throws HyracksDataException {
beforeReadAccess();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java
index 491d476..46eb1d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java
@@ -18,7 +18,12 @@
*/
package org.apache.hyracks.storage.common;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
public interface ILocalResourceRepository {
@@ -28,5 +33,8 @@
void delete(String name) throws HyracksDataException;
+ Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> resourceFolderRoot)
+ throws HyracksDataException;
+
long maxId() throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java
index 2e756ea..5f2ccc6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java
@@ -19,9 +19,12 @@
package org.apache.hyracks.storage.common;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.function.Predicate;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
public class TransientLocalResourceRepository implements ILocalResourceRepository {
@@ -55,6 +58,12 @@
}
@Override
+ public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> roots)
+ throws HyracksDataException {
+ return Map.of();
+ }
+
+ @Override
public long maxId() throws HyracksDataException {
long maxResourceId = 0;