[ASTERIXDB-3268][STO] Add parallel downloader/cacher

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Download uncached files in parallel. Also, when listing
files, we will ignore consulting with the cloud store
for downloaded indexes.

Change-Id: I5f48da0f506200f35fd9b08e51983ef7ad919f6d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17791
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 542e6ae..9e840c2 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -94,7 +94,6 @@
     <dependency>
       <groupId>software.amazon.awssdk.crt</groupId>
       <artifactId>aws-crt</artifactId>
-      <version>0.21.10</version>
     </dependency>
     <dependency>
       <groupId>junit</groupId>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index d9c248e..7243188 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -139,6 +139,7 @@
                 }
             }
         } else {
+            LOGGER.info("Cleaning node partitions...");
             for (FileReference partitionPath : partitionPaths) {
                 CloudFileUtil.cleanDirectoryFiles(localIoManager, cloudFiles, partitionPath);
             }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index f869f37..c095d87 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -22,11 +22,10 @@
 
 import java.io.File;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.asterix.cloud.clients.IParallelDownloader;
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -57,14 +56,10 @@
 
     @Override
     protected void downloadPartitions() throws HyracksDataException {
-        // TODO currently it throws an error in local test
-        Map<String, String> cloudToLocalStoragePaths = new HashMap<>();
-        for (FileReference partitionPath : partitionPaths) {
-            String cloudStoragePath = STORAGE_ROOT_DIR_NAME + "/" + partitionPath.getName();
-            cloudToLocalStoragePaths.put(cloudStoragePath, partitionPath.getAbsolutePath());
-        }
-        LOGGER.info("Resolved paths to io devices: {}", cloudToLocalStoragePaths);
-        cloudClient.syncFiles(bucket, cloudToLocalStoragePaths);
+        IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager);
+        LOGGER.info("Downloading all files located in {}", partitionPaths);
+        downloader.downloadDirectories(partitionPaths);
+        LOGGER.info("Finished downloading {}", partitionPaths);
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 01f684b..6e00817 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -21,11 +21,15 @@
 import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 
 import java.io.FilenameFilter;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.lazy.ParallelCacher;
 import org.apache.asterix.cloud.lazy.accessor.ILazyAccessor;
 import org.apache.asterix.cloud.lazy.accessor.ILazyAccessorReplacer;
 import org.apache.asterix.cloud.lazy.accessor.InitialCloudAccessor;
@@ -54,7 +58,7 @@
 
     public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
         super(ioManager, cloudProperties);
-        accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager, writeBufferProvider);
+        accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager);
         replacer = () -> {
             synchronized (this) {
                 if (!accessor.isLocalAccessor()) {
@@ -92,9 +96,11 @@
         cloudFiles.removeAll(localFiles);
         int remainingUncachedFiles = cloudFiles.size();
         if (remainingUncachedFiles > 0) {
+            List<FileReference> uncachedFiles = resolve(cloudFiles);
+            IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager);
+            ParallelCacher cacher = new ParallelCacher(downloader, uncachedFiles);
             // Local cache misses some files, cloud-based accessor is needed for read operations
-            accessor = new ReplaceableCloudAccessor(cloudClient, bucket, localIoManager, partitions,
-                    remainingUncachedFiles, writeBufferProvider, replacer);
+            accessor = new ReplaceableCloudAccessor(cloudClient, bucket, localIoManager, partitions, replacer, cacher);
         } else {
             // Everything is cached, no need to invoke cloud-based accessor for read operations
             accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
@@ -152,6 +158,14 @@
         log("WRITE", fileRef);
     }
 
+    private List<FileReference> resolve(Set<String> cloudFiles) throws HyracksDataException {
+        List<FileReference> fileReferences = new ArrayList<>();
+        for (String file : cloudFiles) {
+            fileReferences.add(resolve(file));
+        }
+        return fileReferences;
+    }
+
     private void log(String op, FileReference fileReference) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("{} {}", op, fileReference.getRelativePath());
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
index 3135624..1c7713a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -50,13 +50,17 @@
          *      Actually, is there a case where we delete multiple directories from the cloud?
          */
         List<String> paths = fileReferences.stream().map(FileReference::getRelativePath).collect(Collectors.toList());
+        if (paths.isEmpty()) {
+            return 0;
+        }
+
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Bulk deleting: local: {}, cloud: {}", fileReferences, paths);
         }
         cloudClient.deleteObjects(bucket, paths);
         // Bulk delete locally as well
-        int localDeletes = super.performOperation();
-        callBack.call(localDeletes, paths);
-        return paths.size();
+        super.performOperation();
+        callBack.call(fileReferences);
+        return fileReferences.size();
     }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
index 14a0c4e..f0b336d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
@@ -18,8 +18,10 @@
  */
 package org.apache.asterix.cloud.bulk;
 
-import java.util.Collection;
+import java.util.List;
+
+import org.apache.hyracks.api.io.FileReference;
 
 public interface IBulkOperationCallBack {
-    void call(int numberOfAffectedLocalFiles, Collection<String> paths);
+    void call(List<FileReference> fileReferences);
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
index c877be2..6e89cb0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
@@ -18,7 +18,9 @@
  */
 package org.apache.asterix.cloud.bulk;
 
-import java.util.Collection;
+import java.util.List;
+
+import org.apache.hyracks.api.io.FileReference;
 
 public class NoOpDeleteBulkCallBack implements IBulkOperationCallBack {
     public static final IBulkOperationCallBack INSTANCE = new NoOpDeleteBulkCallBack();
@@ -27,7 +29,7 @@
     }
 
     @Override
-    public void call(int numberOfAffectedLocalFiles, Collection<String> paths) {
+    public void call(List<FileReference> fileReferences) {
         // NoOp
     }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index f2b7ff4..76a768a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -22,11 +22,11 @@
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -131,13 +131,13 @@
     boolean exists(String bucket, String path) throws HyracksDataException;
 
     /**
-     * Syncs files by downloading them from cloud storage to local storage
+     * Create a parallel downloader
      *
-     * @param bucket                   bucket to sync from
-     * @param cloudToLocalStoragePaths map of cloud storage partition to local storage path
-     * @throws HyracksDataException HyracksDataException
+     * @param bucket    bucket
+     * @param ioManager local {@link IOManager}
+     * @return an instance of a new parallel downloader
      */
-    void syncFiles(String bucket, Map<String, String> cloudToLocalStoragePaths) throws HyracksDataException;
+    IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager);
 
     /**
      * Produces a {@link JsonNode} that contains information about the stored objects in the cloud
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
new file mode 100644
index 0000000..32d0a74
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+import java.util.Collection;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+
+public interface IParallelDownloader {
+
+    /**
+     * Downloads files in all partitions
+     *
+     * @param toDownload all files to be downloaded
+     */
+    void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException;
+
+    /**
+     * Downloads files in all partitions
+     *
+     * @param toDownload all files to be downloaded
+     * @return file that failed to download
+     */
+    Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) throws HyracksDataException;
+
+    /**
+     * Close the downloader and release all of its resources
+     */
+    void close();
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index fe1f4af..56ed3cf 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -25,7 +25,8 @@
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 
 public class S3ClientConfig {
-
+    // The maximum number of file that can be deleted (AWS restriction)
+    static final int DELETE_BATCH_SIZE = 1000;
     private final String region;
     private final String endpoint;
     private final String prefix;
@@ -59,7 +60,7 @@
         return prefix;
     }
 
-    public boolean isEncodeKeys() {
+    public boolean isLocalS3Provider() {
         // to workaround https://github.com/findify/s3mock/issues/187 in our S3Mock, we encode/decode keys
         return isS3Mock();
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 5faf663..02e1d4f 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -18,40 +18,33 @@
  */
 package org.apache.asterix.cloud.clients.aws.s3;
 
+import static org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig.DELETE_BATCH_SIZE;
 import static org.apache.asterix.cloud.clients.aws.s3.S3Utils.encodeURI;
 import static org.apache.asterix.cloud.clients.aws.s3.S3Utils.listS3Objects;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
 import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler;
-import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.util.IoUtil;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.control.nc.io.IOManager;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,10 +54,8 @@
 import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.core.sync.RequestBody;
 import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
 import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
 import software.amazon.awssdk.services.s3.model.Delete;
 import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
@@ -75,23 +66,11 @@
 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.S3Object;
-import software.amazon.awssdk.transfer.s3.S3TransferManager;
-import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryDownload;
-import software.amazon.awssdk.transfer.s3.model.DirectoryDownload;
-import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
 
 public class S3CloudClient implements ICloudClient {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-    // TODO(htowaileb): Temporary variables, can we get this from the used instance?
-    private static final double MAX_HOST_BANDWIDTH = 10.0; // in Gbps
-    // The maximum number of file that can be deleted (AWS restriction)
-    private static final int DELETE_BATCH_SIZE = 1000;
-
     private final S3ClientConfig config;
     private final S3Client s3Client;
     private final IRequestProfiler profiler;
-    private S3TransferManager s3TransferManager;
 
     public S3CloudClient(S3ClientConfig config) {
         this.config = config;
@@ -113,7 +92,7 @@
     @Override
     public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
         profiler.objectsList();
-        path = config.isEncodeKeys() ? encodeURI(path) : path;
+        path = config.isLocalS3Provider() ? encodeURI(path) : path;
         return filterAndGet(listS3Objects(s3Client, bucket, path), filter);
     }
 
@@ -181,7 +160,7 @@
 
     @Override
     public void copy(String bucket, String srcPath, FileReference destPath) {
-        srcPath = config.isEncodeKeys() ? encodeURI(srcPath) : srcPath;
+        srcPath = config.isLocalS3Provider() ? encodeURI(srcPath) : srcPath;
         List<S3Object> objects = listS3Objects(s3Client, bucket, srcPath);
 
         profiler.objectsList();
@@ -243,50 +222,8 @@
     }
 
     @Override
-    public void syncFiles(String bucket, Map<String, String> cloudToLocalStoragePaths) throws HyracksDataException {
-        LOGGER.info("Syncing cloud storage to local storage started");
-
-        S3TransferManager s3TransferManager = getS3TransferManager();
-
-        List<CompletableFuture<CompletedDirectoryDownload>> downloads = new ArrayList<>();
-        cloudToLocalStoragePaths.forEach((cloudStoragePath, localStoragePath) -> {
-            DownloadDirectoryRequest.Builder builder = DownloadDirectoryRequest.builder();
-            builder.bucket(bucket);
-            builder.destination(Paths.get(localStoragePath));
-            builder.listObjectsV2RequestTransformer(l -> l.prefix(cloudStoragePath));
-
-            LOGGER.info("TransferManager started downloading from cloud \"{}\" to local storage \"{}\"",
-                    cloudStoragePath, localStoragePath);
-            DirectoryDownload directoryDownload = s3TransferManager.downloadDirectory(builder.build());
-            downloads.add(directoryDownload.completionFuture());
-        });
-
-        try {
-            for (CompletableFuture<CompletedDirectoryDownload> download : downloads) {
-                // multipart download
-                profiler.objectMultipartDownload();
-                download.join();
-                CompletedDirectoryDownload completedDirectoryDownload = download.get();
-
-                // if we have failed downloads with transfer manager, try to download them with GetObject
-                if (!completedDirectoryDownload.failedTransfers().isEmpty()) {
-                    LOGGER.warn("TransferManager failed to download file(s), will retry to download each separately");
-                    completedDirectoryDownload.failedTransfers().forEach(LOGGER::warn);
-
-                    Map<String, String> failedFiles = new HashMap<>();
-                    completedDirectoryDownload.failedTransfers().forEach(failed -> {
-                        String cloudStoragePath = failed.request().getObjectRequest().key();
-                        String localStoragePath = failed.request().destination().toAbsolutePath().toString();
-                        failedFiles.put(cloudStoragePath, localStoragePath);
-                    });
-                    downloadFiles(bucket, failedFiles);
-                }
-                LOGGER.info("TransferManager finished downloading {} to local storage", completedDirectoryDownload);
-            }
-        } catch (ExecutionException | InterruptedException e) {
-            throw HyracksDataException.create(e);
-        }
-        LOGGER.info("Syncing cloud storage to local storage successful");
+    public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) {
+        return new S3ParallelDownloader(bucket, ioManager, config, profiler);
     }
 
     @Override
@@ -305,13 +242,7 @@
 
     @Override
     public void close() {
-        if (s3Client != null) {
-            s3Client.close();
-        }
-
-        if (s3TransferManager != null) {
-            s3TransferManager.close();
-        }
+        s3Client.close();
     }
 
     private S3Client buildClient() {
@@ -333,65 +264,11 @@
     private Set<String> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
         Set<String> files = new HashSet<>();
         for (S3Object s3Object : contents) {
-            String path = config.isEncodeKeys() ? S3Utils.decodeURI(s3Object.key()) : s3Object.key();
+            String path = config.isLocalS3Provider() ? S3Utils.decodeURI(s3Object.key()) : s3Object.key();
             if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
                 files.add(path);
             }
         }
         return files;
     }
-
-    private void downloadFiles(String bucket, Map<String, String> cloudToLocalStoragePaths)
-            throws HyracksDataException {
-        byte[] buffer = new byte[8 * 1024];
-        for (Map.Entry<String, String> entry : cloudToLocalStoragePaths.entrySet()) {
-            String cloudStoragePath = entry.getKey();
-            String localStoragePath = entry.getValue();
-
-            LOGGER.info("GetObject started downloading from cloud \"{}\" to local storage \"{}\"", cloudStoragePath,
-                    localStoragePath);
-
-            // TODO(htowaileb): add retry logic here
-            try {
-                File localFile = new File(localStoragePath);
-                FileUtils.createParentDirectories(localFile);
-                if (!localFile.createNewFile()) {
-                    // do nothing for now, a restart has the files when trying to flush, for testing
-                    //throw new IllegalStateException("Couldn't create local file");
-                }
-
-                try (InputStream inputStream = getObjectStream(bucket, cloudStoragePath);
-                        FileOutputStream outputStream = new FileOutputStream(localFile)) {
-                    int bytesRead;
-                    while ((bytesRead = inputStream.read(buffer)) != -1) {
-                        outputStream.write(buffer, 0, bytesRead);
-                    }
-                }
-            } catch (IOException ex) {
-                throw HyracksDataException.create(ex);
-            }
-            LOGGER.info("GetObject successful downloading from cloud \"{}\" to local storage \"{}\"", cloudStoragePath,
-                    localStoragePath);
-        }
-    }
-
-    private S3TransferManager getS3TransferManager() {
-        if (s3TransferManager != null) {
-            return s3TransferManager;
-        }
-
-        S3CrtAsyncClientBuilder builder = S3AsyncClient.crtBuilder();
-        builder.credentialsProvider(config.createCredentialsProvider());
-        builder.region(Region.of(config.getRegion()));
-        builder.targetThroughputInGbps(MAX_HOST_BANDWIDTH);
-        builder.minimumPartSizeInBytes((long) 8 * 1024 * 1024);
-
-        if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
-            builder.endpointOverride(URI.create(config.getEndpoint()));
-        }
-
-        S3AsyncClient client = builder.build();
-        s3TransferManager = S3TransferManager.builder().s3Client(client).build();
-        return s3TransferManager;
-    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
new file mode 100644
index 0000000..c5d9b73
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.aws.s3;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
+import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.transfer.s3.S3TransferManager;
+import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryDownload;
+import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload;
+import software.amazon.awssdk.transfer.s3.model.DirectoryDownload;
+import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
+import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
+import software.amazon.awssdk.transfer.s3.model.FailedFileDownload;
+import software.amazon.awssdk.transfer.s3.model.FileDownload;
+
+@ThreadSafe
+class S3ParallelDownloader implements IParallelDownloader {
+    private final String bucket;
+    private final IOManager ioManager;
+    private final S3AsyncClient s3AsyncClient;
+    private final S3TransferManager transferManager;
+    private final IRequestProfiler profiler;
+
+    S3ParallelDownloader(String bucket, IOManager ioManager, S3ClientConfig config, IRequestProfiler profiler) {
+        this.bucket = bucket;
+        this.ioManager = ioManager;
+        this.profiler = profiler;
+        s3AsyncClient = createAsyncClient(config);
+        transferManager = createS3TransferManager(s3AsyncClient);
+    }
+
+    @Override
+    public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException {
+        try {
+            List<CompletableFuture<CompletedFileDownload>> downloads = startDownloadingFiles(toDownload);
+            waitForFileDownloads(downloads);
+        } catch (IOException | ExecutionException | InterruptedException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload)
+            throws HyracksDataException {
+        Set<FileReference> failedFiles;
+        List<CompletableFuture<CompletedDirectoryDownload>> downloads = startDownloadingDirectories(toDownload);
+        try {
+            failedFiles = waitForDirectoryDownloads(downloads);
+        } catch (ExecutionException | InterruptedException e) {
+            throw HyracksDataException.create(e);
+        }
+
+        return failedFiles;
+    }
+
+    @Override
+    public void close() {
+        transferManager.close();
+        s3AsyncClient.close();
+    }
+
+    private List<CompletableFuture<CompletedFileDownload>> startDownloadingFiles(Collection<FileReference> toDownload)
+            throws IOException {
+        List<CompletableFuture<CompletedFileDownload>> downloads = new ArrayList<>();
+        for (FileReference fileReference : toDownload) {
+            // multipart download
+            profiler.objectGet();
+
+            // Create parent directories
+            FileUtils.createParentDirectories(fileReference.getFile());
+
+            // GetObjectRequest
+            GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder();
+            requestBuilder.bucket(bucket);
+            requestBuilder.key(fileReference.getRelativePath());
+
+            // Download object
+            DownloadFileRequest.Builder builder = DownloadFileRequest.builder();
+            builder.getObjectRequest(requestBuilder.build());
+            builder.destination(fileReference.getFile());
+
+            FileDownload fileDownload = transferManager.downloadFile(builder.build());
+            downloads.add(fileDownload.completionFuture());
+        }
+        return downloads;
+    }
+
+    private void waitForFileDownloads(List<CompletableFuture<CompletedFileDownload>> downloads)
+            throws ExecutionException, InterruptedException {
+
+        for (CompletableFuture<CompletedFileDownload> download : downloads) {
+            download.get();
+        }
+    }
+
+    private List<CompletableFuture<CompletedDirectoryDownload>> startDownloadingDirectories(
+            Collection<FileReference> toDownload) {
+        List<CompletableFuture<CompletedDirectoryDownload>> downloads = new ArrayList<>();
+        for (FileReference fileReference : toDownload) {
+            DownloadDirectoryRequest.Builder builder = DownloadDirectoryRequest.builder();
+            builder.bucket(bucket);
+            builder.destination(fileReference.getFile().toPath());
+            builder.listObjectsV2RequestTransformer(l -> l.prefix(fileReference.getRelativePath()));
+            DirectoryDownload directoryDownload = transferManager.downloadDirectory(builder.build());
+            downloads.add(directoryDownload.completionFuture());
+        }
+        return downloads;
+    }
+
+    private Set<FileReference> waitForDirectoryDownloads(List<CompletableFuture<CompletedDirectoryDownload>> downloads)
+            throws ExecutionException, InterruptedException, HyracksDataException {
+        Set<FileReference> failedFiles = Collections.emptySet();
+        for (CompletableFuture<CompletedDirectoryDownload> download : downloads) {
+            // multipart download
+            profiler.objectMultipartDownload();
+            download.join();
+            CompletedDirectoryDownload completedDirectoryDownload = download.get();
+
+            // if we have failed downloads with transfer manager, try to download them with GetObject
+            if (!completedDirectoryDownload.failedTransfers().isEmpty()) {
+                failedFiles = failedFiles.isEmpty() ? new HashSet<>() : failedFiles;
+                for (FailedFileDownload failedFileDownload : completedDirectoryDownload.failedTransfers()) {
+                    FileReference failedFile = ioManager.resolve(failedFileDownload.request().getObjectRequest().key());
+                    failedFiles.add(failedFile);
+                }
+            }
+        }
+        return failedFiles;
+    }
+
+    private static S3AsyncClient createAsyncClient(S3ClientConfig config) {
+        if (config.isLocalS3Provider()) {
+            // CRT client is not supported by S3Mock
+            return createS3AsyncClient(config);
+        } else {
+            // CRT could provide a better performance when used with an actual S3
+            return createS3CrtAsyncClient(config);
+        }
+    }
+
+    private static S3AsyncClient createS3AsyncClient(S3ClientConfig config) {
+        S3AsyncClientBuilder builder = S3AsyncClient.builder();
+        builder.credentialsProvider(config.createCredentialsProvider());
+        builder.region(Region.of(config.getRegion()));
+
+        if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
+            builder.endpointOverride(URI.create(config.getEndpoint()));
+        }
+
+        return builder.build();
+    }
+
+    private static S3AsyncClient createS3CrtAsyncClient(S3ClientConfig config) {
+        S3CrtAsyncClientBuilder builder = S3AsyncClient.crtBuilder();
+        builder.credentialsProvider(config.createCredentialsProvider());
+        builder.region(Region.of(config.getRegion()));
+
+        if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
+            builder.endpointOverride(URI.create(config.getEndpoint()));
+        }
+
+        return builder.build();
+    }
+
+    private S3TransferManager createS3TransferManager(S3AsyncClient s3AsyncClient) {
+        return S3TransferManager.builder().s3Client(s3AsyncClient).build();
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
new file mode 100644
index 0000000..7e4bc4f
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy;
+
+import java.util.Collection;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+
+public interface IParallelCacher {
+    /**
+     * Check whether an index file data and metadata are already cached
+     *
+     * @param indexDir index directory
+     * @return true if the index is already cached, false otherwise
+     */
+    boolean isCached(FileReference indexDir);
+
+    /**
+     * Downloads all index's data files for all partitions.
+     * The index is inferred from the path of the provided file.
+     *
+     * @param indexFile a file reference in an index
+     * @return true if the remaining number of uncached files is zero, false otherwise
+     */
+    boolean downloadData(FileReference indexFile) throws HyracksDataException;
+
+    /**
+     * Downloads all index's metadata files for all partitions.
+     * The index is inferred from the path of the provided file.
+     *
+     * @param indexFile a file reference in an index
+     * @return true if the remaining number of uncached files is zero, false otherwise
+     */
+    boolean downloadMetadata(FileReference indexFile) throws HyracksDataException;
+
+    /**
+     * Remove the deleted files from the uncached file set
+     *
+     * @param deletedFiles all deleted files
+     * @return true if the remaining number of uncached files is zero, false otherwise
+     */
+    boolean remove(Collection<FileReference> deletedFiles);
+
+    /**
+     * Remove the deleted file from the uncached file set
+     *
+     * @param deletedFile the deleted file
+     * @return true if the remaining number of uncached files is zero, false otherwise
+     */
+    boolean remove(FileReference deletedFile);
+
+    /**
+     * Close cacher resources
+     */
+    void close();
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
new file mode 100644
index 0000000..010433d
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy;
+
+import java.util.Collection;
+
+import org.apache.hyracks.api.io.FileReference;
+
+public class NoOpParallelCacher implements IParallelCacher {
+    public static final IParallelCacher INSTANCE = new NoOpParallelCacher();
+
+    @Override
+    public boolean isCached(FileReference indexDir) {
+        return false;
+    }
+
+    @Override
+    public boolean downloadData(FileReference indexFile) {
+        return false;
+    }
+
+    @Override
+    public boolean downloadMetadata(FileReference indexFile) {
+        return false;
+    }
+
+    @Override
+    public boolean remove(Collection<FileReference> deletedFiles) {
+        return false;
+    }
+
+    @Override
+    public boolean remove(FileReference deletedFile) {
+        return false;
+    }
+
+    @Override
+    public void close() {
+        // NoOp
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
new file mode 100644
index 0000000..2d55d06
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy;
+
+import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
+
+import java.io.FilenameFilter;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * A parallel cacher that maintains and downloads (in parallel) all uncached files
+ *
+ * @see org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor
+ */
+public final class ParallelCacher implements IParallelCacher {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final FilenameFilter METADATA_FILTER =
+            ((dir, name) -> name.startsWith(StorageConstants.INDEX_NON_DATA_FILES_PREFIX));
+
+    private final IParallelDownloader downloader;
+
+    /**
+     * Uncached Indexes subpaths
+     */
+    private final Set<String> uncachedIndexes;
+
+    /**
+     * All uncached data files
+     * Example: BTree files
+     */
+    private final Set<FileReference> uncachedDataFiles;
+
+    /**
+     * All uncached metadata files
+     * Example: index checkpoint files
+     */
+    private final Set<FileReference> uncachedMetadataFiles;
+
+    public ParallelCacher(IParallelDownloader downloader, List<FileReference> uncachedFiles) {
+        this.downloader = downloader;
+        uncachedDataFiles = getFiles(uncachedFiles, COMPONENT_FILES_FILTER);
+        uncachedMetadataFiles = getFiles(uncachedFiles, METADATA_FILTER);
+        uncachedIndexes = getUncachedIndexes(uncachedFiles);
+    }
+
+    @Override
+    public boolean isCached(FileReference indexDir) {
+        String relativePath = indexDir.getRelativePath();
+        if (relativePath.endsWith(StorageConstants.STORAGE_ROOT_DIR_NAME)
+                || relativePath.startsWith(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME)) {
+            return false;
+        }
+        String indexSubPath = StoragePathUtil.getIndexSubPath(indexDir, true);
+        return !indexSubPath.isEmpty() && !uncachedIndexes.contains(indexSubPath);
+    }
+
+    @Override
+    public synchronized boolean downloadData(FileReference indexFile) throws HyracksDataException {
+        String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
+        Set<FileReference> toDownload = new HashSet<>();
+        for (FileReference fileReference : uncachedDataFiles) {
+            if (fileReference.getRelativePath().contains(indexSubPath)) {
+                toDownload.add(fileReference.getParent());
+            }
+        }
+
+        LOGGER.info("Downloading data files for {} in all partitions: {}", indexSubPath, toDownload);
+        Collection<FileReference> failed = downloader.downloadDirectories(toDownload);
+        if (!failed.isEmpty()) {
+            LOGGER.warn("Failed to download data files {}. Re-downloading: {}", indexSubPath, failed);
+            downloader.downloadFiles(failed);
+        }
+        LOGGER.info("Finished downloading data files for {}", indexSubPath);
+        uncachedIndexes.remove(indexSubPath);
+        uncachedDataFiles.removeIf(f -> f.getRelativePath().contains(indexSubPath));
+        return isEmpty();
+    }
+
+    @Override
+    public synchronized boolean downloadMetadata(FileReference indexFile) throws HyracksDataException {
+        String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
+        Set<FileReference> toDownload = new HashSet<>();
+        for (FileReference fileReference : uncachedMetadataFiles) {
+            if (fileReference.getRelativePath().contains(indexSubPath)) {
+                toDownload.add(fileReference);
+            }
+        }
+
+        LOGGER.info("Downloading metadata files for {} in all partitions: {}", indexSubPath, toDownload);
+        downloader.downloadFiles(toDownload);
+        LOGGER.info("Finished downloading metadata files for {}", indexSubPath);
+        uncachedMetadataFiles.removeAll(toDownload);
+        return isEmpty();
+    }
+
+    @Override
+    public boolean remove(Collection<FileReference> deletedFiles) {
+        LOGGER.info("Deleting {}", deletedFiles);
+        for (FileReference fileReference : deletedFiles) {
+            remove(fileReference);
+        }
+
+        return isEmpty();
+    }
+
+    @Override
+    public boolean remove(FileReference deletedFile) {
+        LOGGER.info("Deleting {}", deletedFile);
+        if (COMPONENT_FILES_FILTER.accept(null, deletedFile.getName())) {
+            uncachedDataFiles.remove(deletedFile);
+        } else {
+            uncachedMetadataFiles.remove(deletedFile);
+        }
+
+        return isEmpty();
+    }
+
+    @Override
+    public void close() {
+        downloader.close();
+        LOGGER.info("Parallel cacher was closed");
+    }
+
+    private Set<FileReference> getFiles(List<FileReference> uncachedFiles, FilenameFilter filter) {
+        Set<FileReference> fileReferences = ConcurrentHashMap.newKeySet();
+        for (FileReference fileReference : uncachedFiles) {
+            if (filter.accept(null, fileReference.getName())) {
+                fileReferences.add(fileReference);
+            }
+        }
+        return fileReferences;
+    }
+
+    private Set<String> getUncachedIndexes(List<FileReference> uncachedFiles) {
+        Set<String> uncachedIndexes = ConcurrentHashMap.newKeySet();
+        for (FileReference indexFile : uncachedFiles) {
+            uncachedIndexes.add(StoragePathUtil.getIndexSubPath(indexFile, false));
+        }
+        return uncachedIndexes;
+    }
+
+    private synchronized boolean isEmpty() {
+        int totalSize = uncachedDataFiles.size() + uncachedMetadataFiles.size();
+        LOGGER.info("Current number of uncached files {}", totalSize);
+        return totalSize == 0;
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
index de7efc1..c7ce222 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
@@ -42,18 +42,23 @@
         this.localIoManager = localIoManager;
     }
 
-    int doCloudDelete(FileReference fileReference) throws HyracksDataException {
-        int numberOfCloudDeletes = 0;
+    Set<FileReference> doCloudDelete(FileReference fileReference) throws HyracksDataException {
+        Set<FileReference> deletedFiles = Collections.emptySet();
         if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileReference.getAbsolutePath()))) {
             File localFile = fileReference.getFile();
-            // if file reference exists,and it is a file, then list is not required
-            Set<String> paths =
-                    localFile.exists() && localFile.isFile() ? Collections.singleton(fileReference.getRelativePath())
-                            : doList(fileReference, IoUtil.NO_OP_FILTER).stream().map(FileReference::getRelativePath)
-                                    .collect(Collectors.toSet());
+
+            Set<String> paths;
+            if (localFile.exists() && localFile.isFile()) {
+                // If file reference exists, and it is a file, then list is not required
+                paths = Collections.singleton(fileReference.getRelativePath());
+            } else {
+                // List and delete
+                deletedFiles = doList(fileReference, IoUtil.NO_OP_FILTER);
+                paths = deletedFiles.stream().map(FileReference::getRelativePath).collect(Collectors.toSet());
+            }
+
             cloudClient.deleteObjects(bucket, paths);
-            numberOfCloudDeletes = paths.size();
         }
-        return numberOfCloudDeletes;
+        return deletedFiles;
     }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
index b93da54..798163d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
@@ -20,8 +20,8 @@
 
 import java.util.Collections;
 
-import org.apache.asterix.cloud.WriteBufferProvider;
 import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.lazy.NoOpParallelCacher;
 import org.apache.hyracks.control.nc.io.IOManager;
 
 /**
@@ -32,18 +32,7 @@
     private static final ILazyAccessorReplacer NO_OP_REPLACER = () -> {
     };
 
-    public InitialCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager,
-            WriteBufferProvider writeBufferProvider) {
-        super(cloudClient, bucket, localIoManager, Collections.emptySet(), 0, writeBufferProvider, NO_OP_REPLACER);
-    }
-
-    @Override
-    protected void decrementNumberOfUncachedFiles() {
-        // No Op
-    }
-
-    @Override
-    protected void decrementNumberOfUncachedFiles(int count) {
-        // No Op
+    public InitialCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager) {
+        super(cloudClient, bucket, localIoManager, Collections.emptySet(), NO_OP_REPLACER, NoOpParallelCacher.INSTANCE);
     }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index 19873e8..3dd579b 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -19,48 +19,39 @@
 package org.apache.asterix.cloud.lazy.accessor;
 
 import java.io.FilenameFilter;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.CloudFileHandle;
-import org.apache.asterix.cloud.WriteBufferProvider;
 import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
 import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.cloud.lazy.IParallelCacher;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.control.nc.io.IOManager;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 /**
  * ReplaceableCloudAccessor will be used when some (or all) of the files in the cloud storage are not cached locally.
  * It will be replaced by {@link LocalAccessor} once everything is cached
  */
 public class ReplaceableCloudAccessor extends AbstractLazyAccessor {
-    private static final Logger LOGGER = LogManager.getLogger();
     private final Set<Integer> partitions;
-    private final AtomicInteger numberOfUncachedFiles;
-    private final WriteBufferProvider writeBufferProvider;
     private final ILazyAccessorReplacer replacer;
-    private final IBulkOperationCallBack callBack;
+    private final IParallelCacher cacher;
+    private final IBulkOperationCallBack deleteCallBack;
 
     public ReplaceableCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager,
-            Set<Integer> partitions, int numberOfUncachedFiles, WriteBufferProvider writeBufferProvider,
-            ILazyAccessorReplacer replacer) {
+            Set<Integer> partitions, ILazyAccessorReplacer replacer, IParallelCacher cacher) {
         super(cloudClient, bucket, localIoManager);
         this.partitions = partitions;
-        this.numberOfUncachedFiles = new AtomicInteger(numberOfUncachedFiles);
-        this.writeBufferProvider = writeBufferProvider;
         this.replacer = replacer;
-        this.callBack = (numberOfAffectedLocalFiles, paths) -> {
-            int totalUncached = paths.size() - numberOfAffectedLocalFiles;
-            replaceAccessor(this.numberOfUncachedFiles.addAndGet(-totalUncached));
+        this.cacher = cacher;
+        deleteCallBack = deletedFiles -> {
+            if (cacher.remove(deletedFiles)) {
+                replace();
+            }
         };
     }
 
@@ -71,7 +62,7 @@
 
     @Override
     public IBulkOperationCallBack getBulkOperationCallBack() {
-        return callBack;
+        return deleteCallBack;
     }
 
     @Override
@@ -79,25 +70,65 @@
             IIOManager.FileSyncMode syncMode) throws HyracksDataException {
         FileReference fileRef = fileHandle.getFileReference();
         if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
-            // File doesn't exist locally, download it.
-            ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
-            try {
-                // TODO download for all partitions at once
-                LOGGER.info("Downloading {} ..", fileRef.getRelativePath());
-                CloudFileUtil.downloadFile(localIoManager, cloudClient, bucket, fileHandle, rwMode, syncMode,
-                        writeBuffer);
-                localIoManager.close(fileHandle);
-                LOGGER.info("Finished downloading {}..", fileRef.getRelativePath());
-            } finally {
-                writeBufferProvider.recycle(writeBuffer);
+            if (cacher.downloadData(fileRef)) {
+                replace();
             }
-            // TODO decrement by the number of downloaded files in all partitions (once the above TODO is fixed)
-            decrementNumberOfUncachedFiles();
         }
     }
 
     @Override
     public Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+        if (cacher.isCached(dir)) {
+            return localIoManager.list(dir, filter);
+        }
+        return cloudBackedList(dir, filter);
+    }
+
+    @Override
+    public boolean doExists(FileReference fileRef) throws HyracksDataException {
+        return localIoManager.exists(fileRef) || cloudClient.exists(bucket, fileRef.getRelativePath());
+    }
+
+    @Override
+    public long doGetSize(FileReference fileReference) throws HyracksDataException {
+        if (localIoManager.exists(fileReference)) {
+            return localIoManager.getSize(fileReference);
+        }
+        return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
+    }
+
+    @Override
+    public byte[] doReadAllBytes(FileReference fileRef) throws HyracksDataException {
+        if (!localIoManager.exists(fileRef) && isInNodePartition(fileRef.getRelativePath())) {
+            if (cacher.downloadMetadata(fileRef)) {
+                replace();
+            }
+        }
+        return localIoManager.readAllBytes(fileRef);
+    }
+
+    @Override
+    public void doDelete(FileReference fileReference) throws HyracksDataException {
+        // Never delete the storage dir in cloud storage
+        Set<FileReference> deletedFiles = doCloudDelete(fileReference);
+        if (cacher.remove(deletedFiles)) {
+            replace();
+        }
+        // Finally, delete locally
+        localIoManager.delete(fileReference);
+    }
+
+    @Override
+    public void doOverwrite(FileReference fileReference, byte[] bytes) throws HyracksDataException {
+        boolean existsLocally = localIoManager.exists(fileReference);
+        cloudClient.write(bucket, fileReference.getRelativePath(), bytes);
+        localIoManager.overwrite(fileReference, bytes);
+        if (!existsLocally && cacher.remove(fileReference)) {
+            replace();
+        }
+    }
+
+    private Set<FileReference> cloudBackedList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
         Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
         if (cloudFiles.isEmpty()) {
             return Collections.emptySet();
@@ -127,93 +158,12 @@
         return localFiles;
     }
 
-    @Override
-    public boolean doExists(FileReference fileRef) throws HyracksDataException {
-        return localIoManager.exists(fileRef) || cloudClient.exists(bucket, fileRef.getRelativePath());
-    }
-
-    @Override
-    public long doGetSize(FileReference fileReference) throws HyracksDataException {
-        if (localIoManager.exists(fileReference)) {
-            return localIoManager.getSize(fileReference);
-        }
-        return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
-    }
-
-    @Override
-    public byte[] doReadAllBytes(FileReference fileRef) throws HyracksDataException {
-        if (!localIoManager.exists(fileRef) && isInNodePartition(fileRef.getRelativePath())) {
-            byte[] bytes = cloudClient.readAllBytes(bucket, fileRef.getRelativePath());
-            if (bytes != null && !partitions.isEmpty()) {
-                // Download the missing file for subsequent reads
-                LOGGER.info("Downloading {} ..", fileRef.getRelativePath());
-                localIoManager.overwrite(fileRef, bytes);
-                decrementNumberOfUncachedFiles();
-            }
-            return bytes;
-        }
-        return localIoManager.readAllBytes(fileRef);
-    }
-
-    @Override
-    public void doDelete(FileReference fileReference) throws HyracksDataException {
-        // Never delete the storage dir in cloud storage
-        int numberOfCloudDeletes = doCloudDelete(fileReference);
-        // check local
-        if (numberOfCloudDeletes > 0) {
-            int numberOfLocalDeletes;
-            if (numberOfCloudDeletes == 1) {
-                // file delete
-                numberOfLocalDeletes = localIoManager.exists(fileReference) ? 1 : 0;
-            } else {
-                // directory delete
-                Set<String> localToBeDeleted = localIoManager.list(fileReference).stream()
-                        .map(FileReference::getRelativePath).collect(Collectors.toSet());
-                numberOfLocalDeletes = localToBeDeleted.size();
-            }
-            // Decrement by number of cloud deletes that have no counterparts locally
-            decrementNumberOfUncachedFiles(numberOfCloudDeletes - numberOfLocalDeletes);
-        }
-
-        // Finally, delete locally
-        localIoManager.delete(fileReference);
-    }
-
-    @Override
-    public void doOverwrite(FileReference fileReference, byte[] bytes) throws HyracksDataException {
-        boolean existsLocally = localIoManager.exists(fileReference);
-        cloudClient.write(bucket, fileReference.getRelativePath(), bytes);
-        localIoManager.overwrite(fileReference, bytes);
-        if (!existsLocally) {
-            decrementNumberOfUncachedFiles();
-        }
-    }
-
-    protected void decrementNumberOfUncachedFiles() {
-        replaceAccessor(numberOfUncachedFiles.decrementAndGet());
-    }
-
-    protected void decrementNumberOfUncachedFiles(int count) {
-        if (count > 0) {
-            replaceAccessor(numberOfUncachedFiles.addAndGet(-count));
-        }
-    }
-
     private boolean isInNodePartition(String path) {
         return partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(path));
     }
 
-    void replaceAccessor(int remainingUncached) {
-        if (remainingUncached > 0) {
-            // Some files still not cached yet
-            return;
-        }
-
-        if (remainingUncached < 0) {
-            // This should not happen, log in case that happen
-            LOGGER.warn("Some files were downloaded multiple times. Reported remaining uncached files = {}",
-                    remainingUncached);
-        }
+    private void replace() {
+        cacher.close();
         replacer.replace();
     }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
index aee916c..08cbf43 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
@@ -18,20 +18,11 @@
  */
 package org.apache.asterix.cloud.util;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileHandle;
-import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.control.nc.io.FileHandle;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -42,32 +33,12 @@
     private CloudFileUtil() {
     }
 
-    public static void downloadFile(IOManager ioManager, ICloudClient cloudClient, String bucket, FileHandle fileHandle,
-            IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode, ByteBuffer writeBuffer)
-            throws HyracksDataException {
-        FileReference fileRef = fileHandle.getFileReference();
-        File file = fileRef.getFile();
-
-        try (InputStream inputStream = cloudClient.getObjectStream(bucket, fileRef.getRelativePath())) {
-            FileUtils.createParentDirectories(file);
-            if (!file.createNewFile()) {
-                throw new IllegalStateException("Couldn't create local file");
-            }
-
-            fileHandle.open(rwMode, syncMode);
-            writeToFile(ioManager, fileHandle, inputStream, writeBuffer);
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
     public static void cleanDirectoryFiles(IOManager ioManager, Set<String> cloudFiles, FileReference partitionPath)
             throws HyracksDataException {
         // First get the set of local files
         Set<FileReference> localFiles = ioManager.list(partitionPath);
         Iterator<FileReference> localFilesIter = localFiles.iterator();
-        LOGGER.info("Cleaning partition {}. Total number of unchecked cloud files {}", partitionPath.getRelativePath(),
-                cloudFiles.size());
+        LOGGER.info("Cleaning partition {}.", partitionPath.getRelativePath());
 
         // Reconcile local files and cloud files
         while (localFilesIter.hasNext()) {
@@ -98,39 +69,6 @@
         }
     }
 
-    private static void writeToFile(IOManager ioManager, IFileHandle fileHandle, InputStream inStream,
-            ByteBuffer writeBuffer) throws HyracksDataException {
-        writeBuffer.clear();
-        try {
-            int position = 0;
-            long offset = 0;
-            int read;
-            while ((read = inStream.read(writeBuffer.array(), position, writeBuffer.remaining())) >= 0) {
-                position += read;
-                writeBuffer.position(position);
-                if (writeBuffer.remaining() == 0) {
-                    offset += writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
-                    position = 0;
-                }
-            }
-
-            if (writeBuffer.position() > 0) {
-                writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
-                ioManager.sync(fileHandle, true);
-            }
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    private static long writeBufferToFile(IOManager ioManager, IFileHandle fileHandle, ByteBuffer writeBuffer,
-            long offset) throws HyracksDataException {
-        writeBuffer.flip();
-        long written = ioManager.doSyncWrite(fileHandle, offset, writeBuffer);
-        writeBuffer.clear();
-        return written;
-    }
-
     private static void logDeleteFile(FileReference fileReference) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Deleting {} from the local cache as {} doesn't exist in the cloud", fileReference,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 9702b18..418e171 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.common.utils;
 
+import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
+
 import java.io.File;
 import java.nio.file.Paths;
 import java.util.Iterator;
@@ -43,6 +46,7 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     public static final char DATAVERSE_CONTINUATION_MARKER = '^';
+    private static String PARTITION_PATH = STORAGE_ROOT_DIR_NAME + File.separator + PARTITION_DIR_PREFIX;
 
     private StoragePathUtil() {
     }
@@ -71,8 +75,7 @@
     }
 
     public static String prepareStoragePartitionPath(int partitonId) {
-        return Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + partitonId)
-                .toString();
+        return Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, PARTITION_DIR_PREFIX + partitonId).toString();
     }
 
     public static String prepareIngestionLogPath() {
@@ -103,8 +106,7 @@
     }
 
     public static int getPartitionNumFromRelativePath(String relativePath) {
-        int startIdx = relativePath.lastIndexOf(StorageConstants.PARTITION_DIR_PREFIX)
-                + StorageConstants.PARTITION_DIR_PREFIX.length();
+        int startIdx = relativePath.lastIndexOf(PARTITION_DIR_PREFIX) + PARTITION_DIR_PREFIX.length();
         int partitionEndIdx = relativePath.indexOf(File.separatorChar, startIdx);
         int idxEnd = partitionEndIdx != -1 ? partitionEndIdx : relativePath.length();
         String partition = relativePath.substring(startIdx, idxEnd);
@@ -183,4 +185,32 @@
     public static FileReference getIndexPath(IIOManager ioManager, ResourceReference ref) throws HyracksDataException {
         return ioManager.resolve(ref.getRelativePath().toString());
     }
+
+    /**
+     * Returns the index's path after the partition directory
+     * Example:
+     * - Input:
+     * /../storage/partition_8/dataverse_p1[/^dataverse_p2[/^dataverse_p3...]]/dataset/rebalanceCount/index/0_0_b
+     * - Output
+     * dataverse_p1[/^dataverse_p2[/^dataverse_p3...]]/dataset/rebalanceCount/index
+     *
+     * @param fileReference a file inside the index director
+     * @param isDirectory   if the provided {@link FileReference} corresponds to a directory
+     * @return index path
+     */
+    public static String getIndexSubPath(FileReference fileReference, boolean isDirectory) {
+        String relativePath = fileReference.getRelativePath();
+        if (relativePath.length() <= PARTITION_PATH.length() || !relativePath.startsWith(PARTITION_PATH)) {
+            return "";
+        }
+        String partition = PARTITION_PATH + getPartitionNumFromRelativePath(relativePath);
+        int partitionStart = relativePath.indexOf(partition);
+        int start = partitionStart + partition.length() + 1;
+        int end = isDirectory ? relativePath.length() : relativePath.lastIndexOf('/');
+        if (start >= end) {
+            // This could happen if the provided path contains only a partition path (e.g., storage/partition_0)
+            return "";
+        }
+        return relativePath.substring(start, end);
+    }
 }
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 161374c..4f990ea 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -91,6 +91,7 @@
     <jacoco.version>0.7.6.201602180812</jacoco.version>
     <log4j.version>2.19.0</log4j.version>
     <awsjavasdk.version>2.20.135</awsjavasdk.version>
+    <awsjavasdk.crt.version>0.27.1</awsjavasdk.crt.version>
     <parquet.version>1.12.3</parquet.version>
     <hadoop-awsjavasdk.version>1.12.402</hadoop-awsjavasdk.version>
     <azureblobjavasdk.version>12.22.0</azureblobjavasdk.version>
@@ -1641,6 +1642,11 @@
         <artifactId>sts</artifactId>
         <version>${awsjavasdk.version}</version>
       </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk.crt</groupId>
+        <artifactId>aws-crt</artifactId>
+        <version>${awsjavasdk.crt.version}</version>
+      </dependency>
       <!-- Mock for AWS S3 -->
       <dependency>
         <groupId>io.findify</groupId>