[NO ISSUE][STO] Download metadata files on lazy caching

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Download metadata files for the node storage partitions
  on bootstrap on lazy caching.
- Use the local + uncached files list to perform list operations
  rather than calling the cloud API.
- Delay partition clean up on promotion for cloud deployments.

Change-Id: I8ba767d5ffa0257a429af8c455bacea3df7ff7a7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17856
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index b4223bc..0786895 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.transaction.management.service.transaction.GlobalTransactionContext;
 import org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo;
 import org.apache.hyracks.api.application.ICCServiceContext;
@@ -182,7 +183,7 @@
 
     @Override
     public void rollback() throws Exception {
-        Set<FileReference> txnLogFileRefs = ioManager.list(ioManager.resolve("."));
+        Set<FileReference> txnLogFileRefs = ioManager.list(ioManager.resolve(StorageConstants.GLOBAL_TXN_DIR_NAME));
         for (FileReference txnLogFileRef : txnLogFileRefs) {
             IGlobalTransactionContext context = new GlobalTransactionContext(txnLogFileRef, ioManager);
             txnContextRepository.put(context.getJobId(), context);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 0684442..101b1b0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -129,9 +129,8 @@
         LOGGER.warn("promoting partition {}", partition);
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
-        localResourceRepository.cleanup(partition);
-        localResourceRepository.clearResourcesCache();
         if (!appCtx.isCloudDeployment()) {
+            localResourceRepository.cleanup(partition);
             final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
             recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 5d0901d..4135f35 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -219,7 +219,7 @@
     protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
             Set<Integer> activePartitions) {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
-        Set<Integer> nodeActivePartitions = getNodeActivePartitions(newNodeId, activePartitions, metadataNode);
+        Set<Integer> nodeActivePartitions = getNodeActivePartitions(newNodeId, activePartitions, metadataNode, state);
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions));
         int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
         // Add any cloud-related tasks
@@ -322,7 +322,8 @@
         return true;
     }
 
-    protected Set<Integer> getNodeActivePartitions(String nodeId, Set<Integer> nodePartitions, boolean metadataNode) {
+    protected Set<Integer> getNodeActivePartitions(String nodeId, Set<Integer> nodePartitions, boolean metadataNode,
+            SystemState state) {
         if (metadataNode) {
             nodePartitions.add(clusterManager.getMetadataPartition().getPartitionId());
         }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 1a99a34..97a6173 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.cloud;
 
+import static org.apache.asterix.cloud.lazy.ParallelCacher.METADATA_FILTER;
 import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
 import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 
@@ -79,7 +80,8 @@
      */
 
     @Override
-    protected void downloadPartitions(boolean metadataNode, int metadataPartition) throws HyracksDataException {
+    protected synchronized void downloadPartitions(boolean metadataNode, int metadataPartition)
+            throws HyracksDataException {
         // Get the files in all relevant partitions from the cloud
         Set<String> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER).stream()
                 .filter(f -> partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(f)))
@@ -99,12 +101,15 @@
         cloudFiles.removeAll(localFiles);
         int remainingUncachedFiles = cloudFiles.size();
         if (remainingUncachedFiles > 0) {
+            LOGGER.debug("The number of uncached files: {}. Uncached files: {}", remainingUncachedFiles, cloudFiles);
             // Get list of FileReferences from the list of cloud (i.e., resolve each path's string to FileReference)
             List<FileReference> uncachedFiles = resolve(cloudFiles);
             // Create a parallel downloader using the given cloudClient
             IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager);
             // Download metadata partition (if this node is a metadata node)
             downloadMetadataPartition(downloader, uncachedFiles, metadataNode, metadataPartition);
+            // Download all metadata files to avoid (List) calls to the cloud when listing/reading these files
+            downloadMetadataFiles(downloader, uncachedFiles);
             // Create a parallel cacher which download and monitor all uncached files
             ParallelCacher cacher = new ParallelCacher(downloader, uncachedFiles);
             // Local cache misses some files, cloud-based accessor is needed for read operations
@@ -113,20 +118,18 @@
             // Everything is cached, no need to invoke cloud-based accessor for read operations
             accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
         }
-
-        LOGGER.info("The number of uncached files: {}. Uncached files: {}", remainingUncachedFiles, cloudFiles);
     }
 
     private void downloadMetadataPartition(IParallelDownloader downloader, List<FileReference> uncachedFiles,
             boolean metadataNode, int metadataPartition) throws HyracksDataException {
         String partitionDir = PARTITION_DIR_PREFIX + metadataPartition;
         if (metadataNode && uncachedFiles.stream().anyMatch(f -> f.getRelativePath().contains(partitionDir))) {
-            LOGGER.info("Downloading metadata partition {}, Current uncached files: {}", metadataPartition,
+            LOGGER.debug("Downloading metadata partition {}, Current uncached files: {}", metadataPartition,
                     uncachedFiles);
             FileReference metadataDir = resolve(STORAGE_ROOT_DIR_NAME + File.separator + partitionDir);
             downloader.downloadDirectories(Collections.singleton(metadataDir));
             uncachedFiles.removeIf(f -> f.getRelativePath().contains(partitionDir));
-            LOGGER.info("Finished downloading metadata partition. Current uncached files: {}", uncachedFiles);
+            LOGGER.debug("Finished downloading metadata partition. Current uncached files: {}", uncachedFiles);
         }
     }
 
@@ -192,4 +195,19 @@
             LOGGER.debug("{} {}", op, fileReference.getRelativePath());
         }
     }
+
+    private void downloadMetadataFiles(IParallelDownloader downloader, List<FileReference> uncachedFiles)
+            throws HyracksDataException {
+        Set<FileReference> uncachedMetadataFiles = ParallelCacher.getFiles(uncachedFiles, METADATA_FILTER);
+        if (!uncachedMetadataFiles.isEmpty()) {
+            LOGGER.debug("Downloading metadata files for all partitions; current uncached files: {}", uncachedFiles);
+            downloader.downloadFiles(uncachedMetadataFiles);
+            uncachedFiles.removeAll(uncachedMetadataFiles);
+            LOGGER.debug("Finished downloading metadata files for all partitions. Current uncached files: {}",
+                    uncachedFiles);
+        } else {
+            LOGGER.debug("all metadata files for all partitions are already cached; current uncached files: {} ",
+                    uncachedFiles);
+        }
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
index 7e4bc4f..6600356 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
@@ -18,7 +18,9 @@
  */
 package org.apache.asterix.cloud.lazy;
 
+import java.io.FilenameFilter;
 import java.util.Collection;
+import java.util.Set;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -32,6 +34,8 @@
      */
     boolean isCached(FileReference indexDir);
 
+    Set<FileReference> getUncachedFiles(FileReference dir, FilenameFilter filter);
+
     /**
      * Downloads all index's data files for all partitions.
      * The index is inferred from the path of the provided file.
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
index 010433d..a77652c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
@@ -18,7 +18,10 @@
  */
 package org.apache.asterix.cloud.lazy;
 
+import java.io.FilenameFilter;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
 
 import org.apache.hyracks.api.io.FileReference;
 
@@ -31,6 +34,11 @@
     }
 
     @Override
+    public Set<FileReference> getUncachedFiles(FileReference dir, FilenameFilter filter) {
+        return Collections.emptySet();
+    }
+
+    @Override
     public boolean downloadData(FileReference indexFile) {
         return false;
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index 2d55d06..3cc481e 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.clients.IParallelDownloader;
 import org.apache.asterix.common.utils.StorageConstants;
@@ -41,12 +42,11 @@
  * @see org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor
  */
 public final class ParallelCacher implements IParallelCacher {
-    private static final Logger LOGGER = LogManager.getLogger();
-    private static final FilenameFilter METADATA_FILTER =
+
+    public static final FilenameFilter METADATA_FILTER =
             ((dir, name) -> name.startsWith(StorageConstants.INDEX_NON_DATA_FILES_PREFIX));
-
+    private static final Logger LOGGER = LogManager.getLogger();
     private final IParallelDownloader downloader;
-
     /**
      * Uncached Indexes subpaths
      */
@@ -83,6 +83,19 @@
     }
 
     @Override
+    public Set<FileReference> getUncachedFiles(FileReference dir, FilenameFilter filter) {
+        if (dir.getRelativePath().endsWith(StorageConstants.STORAGE_ROOT_DIR_NAME)) {
+            return uncachedDataFiles.stream()
+                    .filter(f -> StoragePathUtil.hasSameStorageRoot(dir, f) && filter.accept(null, f.getName()))
+                    .collect(Collectors.toSet());
+        }
+        return uncachedDataFiles
+                .stream().filter(f -> StoragePathUtil.hasSameStorageRoot(dir, f)
+                        && StoragePathUtil.isRelativeParent(dir, f) && filter.accept(null, f.getName()))
+                .collect(Collectors.toSet());
+    }
+
+    @Override
     public synchronized boolean downloadData(FileReference indexFile) throws HyracksDataException {
         String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
         Set<FileReference> toDownload = new HashSet<>();
@@ -92,13 +105,13 @@
             }
         }
 
-        LOGGER.info("Downloading data files for {} in all partitions: {}", indexSubPath, toDownload);
+        LOGGER.debug("Downloading data files for {} in all partitions: {}", indexSubPath, toDownload);
         Collection<FileReference> failed = downloader.downloadDirectories(toDownload);
         if (!failed.isEmpty()) {
             LOGGER.warn("Failed to download data files {}. Re-downloading: {}", indexSubPath, failed);
             downloader.downloadFiles(failed);
         }
-        LOGGER.info("Finished downloading data files for {}", indexSubPath);
+        LOGGER.debug("Finished downloading data files for {}", indexSubPath);
         uncachedIndexes.remove(indexSubPath);
         uncachedDataFiles.removeIf(f -> f.getRelativePath().contains(indexSubPath));
         return isEmpty();
@@ -114,9 +127,9 @@
             }
         }
 
-        LOGGER.info("Downloading metadata files for {} in all partitions: {}", indexSubPath, toDownload);
+        LOGGER.debug("Downloading metadata files for {} in all partitions: {}", indexSubPath, toDownload);
         downloader.downloadFiles(toDownload);
-        LOGGER.info("Finished downloading metadata files for {}", indexSubPath);
+        LOGGER.debug("Finished downloading metadata files for {}", indexSubPath);
         uncachedMetadataFiles.removeAll(toDownload);
         return isEmpty();
     }
@@ -149,7 +162,7 @@
         LOGGER.info("Parallel cacher was closed");
     }
 
-    private Set<FileReference> getFiles(List<FileReference> uncachedFiles, FilenameFilter filter) {
+    public static Set<FileReference> getFiles(List<FileReference> uncachedFiles, FilenameFilter filter) {
         Set<FileReference> fileReferences = ConcurrentHashMap.newKeySet();
         for (FileReference fileReference : uncachedFiles) {
             if (filter.accept(null, fileReference.getName())) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index 3dd579b..bfa353a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -26,17 +26,21 @@
 import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.lazy.IParallelCacher;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * ReplaceableCloudAccessor will be used when some (or all) of the files in the cloud storage are not cached locally.
  * It will be replaced by {@link LocalAccessor} once everything is cached
  */
 public class ReplaceableCloudAccessor extends AbstractLazyAccessor {
+    private static final Logger LOGGER = LogManager.getLogger();
     private final Set<Integer> partitions;
     private final ILazyAccessorReplacer replacer;
     private final IParallelCacher cacher;
@@ -78,10 +82,18 @@
 
     @Override
     public Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
-        if (cacher.isCached(dir)) {
-            return localIoManager.list(dir, filter);
+        if (isTxnDir(dir)) {
+            return cloudBackedList(dir, filter);
         }
-        return cloudBackedList(dir, filter);
+        Set<FileReference> localList = localIoManager.list(dir, filter);
+        Set<FileReference> uncachedFiles = cacher.getUncachedFiles(dir, filter);
+        localList.addAll(uncachedFiles);
+        return localList;
+    }
+
+    private static boolean isTxnDir(FileReference dir) {
+        return dir.getRelativePath().startsWith(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME)
+                || dir.getName().equals(StorageConstants.GLOBAL_TXN_DIR_NAME);
     }
 
     @Override
@@ -129,6 +141,7 @@
     }
 
     private Set<FileReference> cloudBackedList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+        LOGGER.debug("CLOUD LIST: {}", dir);
         Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
         if (cloudFiles.isEmpty()) {
             return Collections.emptySet();
@@ -151,7 +164,7 @@
         // Add the remaining files that are not stored locally in their designated partitions (if any)
         for (String cloudFile : cloudFiles) {
             FileReference localFile = localIoManager.resolve(cloudFile);
-            if (isInNodePartition(cloudFile) && dir.getDeviceHandle().equals(localFile.getDeviceHandle())) {
+            if (isInNodePartition(cloudFile) && StoragePathUtil.hasSameStorageRoot(dir, localFile)) {
                 localFiles.add(localFile);
             }
         }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index ed9c48e..5dcaaf4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -31,6 +31,7 @@
 public class StorageConstants {
 
     public static final String METADATA_TXN_NOWAL_DIR_NAME = "mtd-txn-logs";
+    public static final String GLOBAL_TXN_DIR_NAME = ".";
     public static final String STORAGE_ROOT_DIR_NAME = "storage";
     public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
     public static final String PARTITION_DIR_PREFIX = "partition_";
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 9862be7..e9a8753 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -218,4 +218,12 @@
         }
         return relativePath.substring(start, end);
     }
+
+    public static boolean hasSameStorageRoot(FileReference file1, FileReference file2) {
+        return file1.getDeviceHandle().equals(file2.getDeviceHandle());
+    }
+
+    public static boolean isRelativeParent(FileReference parent, FileReference child) {
+        return child.getRelativePath().startsWith(parent.getRelativePath());
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 33cbf2d..96ae699 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -566,6 +566,7 @@
                 throw HyracksDataException.create(e);
             }
         } finally {
+            clearResourcesCache();
             afterReadAccess();
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/LocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/LocalResource.java
index f5e2945..7722fb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/LocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/LocalResource.java
@@ -106,7 +106,7 @@
     @Override
     public String toString() {
         return new StringBuilder("{\"").append(LocalResource.class.getSimpleName()).append("\" : ").append("{\"id\" = ")
-                .append(id).append(", \"resource\" : ").append(resource).append(", \"version\" : ").append(version)
+                .append(id).append(", \"resource\" : ").append(getPath()).append(", \"version\" : ").append(version)
                 .append(" } ").toString();
     }