[NO ISSUE][STO] Download metadata files on lazy caching
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Download metadata files for the node storage partitions
on bootstrap on lazy caching.
- Use the local + uncached files list to perform list operations
rather than calling the cloud API.
- Delay partition clean up on promotion for cloud deployments.
Change-Id: I8ba767d5ffa0257a429af8c455bacea3df7ff7a7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17856
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index b4223bc..0786895 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.transaction.management.service.transaction.GlobalTransactionContext;
import org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo;
import org.apache.hyracks.api.application.ICCServiceContext;
@@ -182,7 +183,7 @@
@Override
public void rollback() throws Exception {
- Set<FileReference> txnLogFileRefs = ioManager.list(ioManager.resolve("."));
+ Set<FileReference> txnLogFileRefs = ioManager.list(ioManager.resolve(StorageConstants.GLOBAL_TXN_DIR_NAME));
for (FileReference txnLogFileRef : txnLogFileRefs) {
IGlobalTransactionContext context = new GlobalTransactionContext(txnLogFileRef, ioManager);
txnContextRepository.put(context.getJobId(), context);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 0684442..101b1b0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -129,9 +129,8 @@
LOGGER.warn("promoting partition {}", partition);
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
- localResourceRepository.cleanup(partition);
- localResourceRepository.clearResourcesCache();
if (!appCtx.isCloudDeployment()) {
+ localResourceRepository.cleanup(partition);
final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 5d0901d..4135f35 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -219,7 +219,7 @@
protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
Set<Integer> activePartitions) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
- Set<Integer> nodeActivePartitions = getNodeActivePartitions(newNodeId, activePartitions, metadataNode);
+ Set<Integer> nodeActivePartitions = getNodeActivePartitions(newNodeId, activePartitions, metadataNode, state);
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions));
int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
// Add any cloud-related tasks
@@ -322,7 +322,8 @@
return true;
}
- protected Set<Integer> getNodeActivePartitions(String nodeId, Set<Integer> nodePartitions, boolean metadataNode) {
+ protected Set<Integer> getNodeActivePartitions(String nodeId, Set<Integer> nodePartitions, boolean metadataNode,
+ SystemState state) {
if (metadataNode) {
nodePartitions.add(clusterManager.getMetadataPartition().getPartitionId());
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 1a99a34..97a6173 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.cloud;
+import static org.apache.asterix.cloud.lazy.ParallelCacher.METADATA_FILTER;
import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
@@ -79,7 +80,8 @@
*/
@Override
- protected void downloadPartitions(boolean metadataNode, int metadataPartition) throws HyracksDataException {
+ protected synchronized void downloadPartitions(boolean metadataNode, int metadataPartition)
+ throws HyracksDataException {
// Get the files in all relevant partitions from the cloud
Set<String> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER).stream()
.filter(f -> partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(f)))
@@ -99,12 +101,15 @@
cloudFiles.removeAll(localFiles);
int remainingUncachedFiles = cloudFiles.size();
if (remainingUncachedFiles > 0) {
+ LOGGER.debug("The number of uncached files: {}. Uncached files: {}", remainingUncachedFiles, cloudFiles);
// Get list of FileReferences from the list of cloud (i.e., resolve each path's string to FileReference)
List<FileReference> uncachedFiles = resolve(cloudFiles);
// Create a parallel downloader using the given cloudClient
IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager);
// Download metadata partition (if this node is a metadata node)
downloadMetadataPartition(downloader, uncachedFiles, metadataNode, metadataPartition);
+ // Download all metadata files to avoid (List) calls to the cloud when listing/reading these files
+ downloadMetadataFiles(downloader, uncachedFiles);
// Create a parallel cacher which download and monitor all uncached files
ParallelCacher cacher = new ParallelCacher(downloader, uncachedFiles);
// Local cache misses some files, cloud-based accessor is needed for read operations
@@ -113,20 +118,18 @@
// Everything is cached, no need to invoke cloud-based accessor for read operations
accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
}
-
- LOGGER.info("The number of uncached files: {}. Uncached files: {}", remainingUncachedFiles, cloudFiles);
}
private void downloadMetadataPartition(IParallelDownloader downloader, List<FileReference> uncachedFiles,
boolean metadataNode, int metadataPartition) throws HyracksDataException {
String partitionDir = PARTITION_DIR_PREFIX + metadataPartition;
if (metadataNode && uncachedFiles.stream().anyMatch(f -> f.getRelativePath().contains(partitionDir))) {
- LOGGER.info("Downloading metadata partition {}, Current uncached files: {}", metadataPartition,
+ LOGGER.debug("Downloading metadata partition {}, Current uncached files: {}", metadataPartition,
uncachedFiles);
FileReference metadataDir = resolve(STORAGE_ROOT_DIR_NAME + File.separator + partitionDir);
downloader.downloadDirectories(Collections.singleton(metadataDir));
uncachedFiles.removeIf(f -> f.getRelativePath().contains(partitionDir));
- LOGGER.info("Finished downloading metadata partition. Current uncached files: {}", uncachedFiles);
+ LOGGER.debug("Finished downloading metadata partition. Current uncached files: {}", uncachedFiles);
}
}
@@ -192,4 +195,19 @@
LOGGER.debug("{} {}", op, fileReference.getRelativePath());
}
}
+
+ private void downloadMetadataFiles(IParallelDownloader downloader, List<FileReference> uncachedFiles)
+ throws HyracksDataException {
+ Set<FileReference> uncachedMetadataFiles = ParallelCacher.getFiles(uncachedFiles, METADATA_FILTER);
+ if (!uncachedMetadataFiles.isEmpty()) {
+ LOGGER.debug("Downloading metadata files for all partitions; current uncached files: {}", uncachedFiles);
+ downloader.downloadFiles(uncachedMetadataFiles);
+ uncachedFiles.removeAll(uncachedMetadataFiles);
+ LOGGER.debug("Finished downloading metadata files for all partitions. Current uncached files: {}",
+ uncachedFiles);
+ } else {
+ LOGGER.debug("all metadata files for all partitions are already cached; current uncached files: {} ",
+ uncachedFiles);
+ }
+ }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
index 7e4bc4f..6600356 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
@@ -18,7 +18,9 @@
*/
package org.apache.asterix.cloud.lazy;
+import java.io.FilenameFilter;
import java.util.Collection;
+import java.util.Set;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -32,6 +34,8 @@
*/
boolean isCached(FileReference indexDir);
+ Set<FileReference> getUncachedFiles(FileReference dir, FilenameFilter filter);
+
/**
* Downloads all index's data files for all partitions.
* The index is inferred from the path of the provided file.
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
index 010433d..a77652c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
@@ -18,7 +18,10 @@
*/
package org.apache.asterix.cloud.lazy;
+import java.io.FilenameFilter;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
import org.apache.hyracks.api.io.FileReference;
@@ -31,6 +34,11 @@
}
@Override
+ public Set<FileReference> getUncachedFiles(FileReference dir, FilenameFilter filter) {
+ return Collections.emptySet();
+ }
+
+ @Override
public boolean downloadData(FileReference indexFile) {
return false;
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index 2d55d06..3cc481e 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.asterix.cloud.clients.IParallelDownloader;
import org.apache.asterix.common.utils.StorageConstants;
@@ -41,12 +42,11 @@
* @see org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor
*/
public final class ParallelCacher implements IParallelCacher {
- private static final Logger LOGGER = LogManager.getLogger();
- private static final FilenameFilter METADATA_FILTER =
+
+ public static final FilenameFilter METADATA_FILTER =
((dir, name) -> name.startsWith(StorageConstants.INDEX_NON_DATA_FILES_PREFIX));
-
+ private static final Logger LOGGER = LogManager.getLogger();
private final IParallelDownloader downloader;
-
/**
* Uncached Indexes subpaths
*/
@@ -83,6 +83,19 @@
}
@Override
+ public Set<FileReference> getUncachedFiles(FileReference dir, FilenameFilter filter) {
+ if (dir.getRelativePath().endsWith(StorageConstants.STORAGE_ROOT_DIR_NAME)) {
+ return uncachedDataFiles.stream()
+ .filter(f -> StoragePathUtil.hasSameStorageRoot(dir, f) && filter.accept(null, f.getName()))
+ .collect(Collectors.toSet());
+ }
+ return uncachedDataFiles
+ .stream().filter(f -> StoragePathUtil.hasSameStorageRoot(dir, f)
+ && StoragePathUtil.isRelativeParent(dir, f) && filter.accept(null, f.getName()))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
public synchronized boolean downloadData(FileReference indexFile) throws HyracksDataException {
String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
Set<FileReference> toDownload = new HashSet<>();
@@ -92,13 +105,13 @@
}
}
- LOGGER.info("Downloading data files for {} in all partitions: {}", indexSubPath, toDownload);
+ LOGGER.debug("Downloading data files for {} in all partitions: {}", indexSubPath, toDownload);
Collection<FileReference> failed = downloader.downloadDirectories(toDownload);
if (!failed.isEmpty()) {
LOGGER.warn("Failed to download data files {}. Re-downloading: {}", indexSubPath, failed);
downloader.downloadFiles(failed);
}
- LOGGER.info("Finished downloading data files for {}", indexSubPath);
+ LOGGER.debug("Finished downloading data files for {}", indexSubPath);
uncachedIndexes.remove(indexSubPath);
uncachedDataFiles.removeIf(f -> f.getRelativePath().contains(indexSubPath));
return isEmpty();
@@ -114,9 +127,9 @@
}
}
- LOGGER.info("Downloading metadata files for {} in all partitions: {}", indexSubPath, toDownload);
+ LOGGER.debug("Downloading metadata files for {} in all partitions: {}", indexSubPath, toDownload);
downloader.downloadFiles(toDownload);
- LOGGER.info("Finished downloading metadata files for {}", indexSubPath);
+ LOGGER.debug("Finished downloading metadata files for {}", indexSubPath);
uncachedMetadataFiles.removeAll(toDownload);
return isEmpty();
}
@@ -149,7 +162,7 @@
LOGGER.info("Parallel cacher was closed");
}
- private Set<FileReference> getFiles(List<FileReference> uncachedFiles, FilenameFilter filter) {
+ public static Set<FileReference> getFiles(List<FileReference> uncachedFiles, FilenameFilter filter) {
Set<FileReference> fileReferences = ConcurrentHashMap.newKeySet();
for (FileReference fileReference : uncachedFiles) {
if (filter.accept(null, fileReference.getName())) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index 3dd579b..bfa353a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -26,17 +26,21 @@
import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.lazy.IParallelCacher;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* ReplaceableCloudAccessor will be used when some (or all) of the files in the cloud storage are not cached locally.
* It will be replaced by {@link LocalAccessor} once everything is cached
*/
public class ReplaceableCloudAccessor extends AbstractLazyAccessor {
+ private static final Logger LOGGER = LogManager.getLogger();
private final Set<Integer> partitions;
private final ILazyAccessorReplacer replacer;
private final IParallelCacher cacher;
@@ -78,10 +82,18 @@
@Override
public Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
- if (cacher.isCached(dir)) {
- return localIoManager.list(dir, filter);
+ if (isTxnDir(dir)) {
+ return cloudBackedList(dir, filter);
}
- return cloudBackedList(dir, filter);
+ Set<FileReference> localList = localIoManager.list(dir, filter);
+ Set<FileReference> uncachedFiles = cacher.getUncachedFiles(dir, filter);
+ localList.addAll(uncachedFiles);
+ return localList;
+ }
+
+ private static boolean isTxnDir(FileReference dir) {
+ return dir.getRelativePath().startsWith(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME)
+ || dir.getName().equals(StorageConstants.GLOBAL_TXN_DIR_NAME);
}
@Override
@@ -129,6 +141,7 @@
}
private Set<FileReference> cloudBackedList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+ LOGGER.debug("CLOUD LIST: {}", dir);
Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
if (cloudFiles.isEmpty()) {
return Collections.emptySet();
@@ -151,7 +164,7 @@
// Add the remaining files that are not stored locally in their designated partitions (if any)
for (String cloudFile : cloudFiles) {
FileReference localFile = localIoManager.resolve(cloudFile);
- if (isInNodePartition(cloudFile) && dir.getDeviceHandle().equals(localFile.getDeviceHandle())) {
+ if (isInNodePartition(cloudFile) && StoragePathUtil.hasSameStorageRoot(dir, localFile)) {
localFiles.add(localFile);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index ed9c48e..5dcaaf4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -31,6 +31,7 @@
public class StorageConstants {
public static final String METADATA_TXN_NOWAL_DIR_NAME = "mtd-txn-logs";
+ public static final String GLOBAL_TXN_DIR_NAME = ".";
public static final String STORAGE_ROOT_DIR_NAME = "storage";
public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
public static final String PARTITION_DIR_PREFIX = "partition_";
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 9862be7..e9a8753 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -218,4 +218,12 @@
}
return relativePath.substring(start, end);
}
+
+ public static boolean hasSameStorageRoot(FileReference file1, FileReference file2) {
+ return file1.getDeviceHandle().equals(file2.getDeviceHandle());
+ }
+
+ public static boolean isRelativeParent(FileReference parent, FileReference child) {
+ return child.getRelativePath().startsWith(parent.getRelativePath());
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 33cbf2d..96ae699 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -566,6 +566,7 @@
throw HyracksDataException.create(e);
}
} finally {
+ clearResourcesCache();
afterReadAccess();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/LocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/LocalResource.java
index f5e2945..7722fb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/LocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/LocalResource.java
@@ -106,7 +106,7 @@
@Override
public String toString() {
return new StringBuilder("{\"").append(LocalResource.class.getSimpleName()).append("\" : ").append("{\"id\" = ")
- .append(id).append(", \"resource\" : ").append(resource).append(", \"version\" : ").append(version)
+ .append(id).append(", \"resource\" : ").append(getPath()).append(", \"version\" : ").append(version)
.append(" } ").toString();
}