[ASTERIXDB-3387][STO]: Prepare asterix-cloud for unlimited storage

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Prepare asterix-cloud for supporting 'Selective' accessor,
where a file can have holes or can be evicted entirely.

Change-Id: I6653bbc780bb33e900170a65b4a7334c1c97a80e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18248
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 36cb039..659a6d7 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -137,6 +137,11 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-cloud</artifactId>
+      <version>${hyracks.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.asterix</groupId>
       <artifactId>asterix-common</artifactId>
       <version>${project.version}</version>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 7ab8a5a..8cf5ac5 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -24,6 +24,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -33,6 +34,7 @@
 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
 import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
 import org.apache.asterix.cloud.clients.CloudClientProvider;
+import org.apache.asterix.cloud.clients.CloudFile;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.cloud.util.CloudFileUtil;
@@ -47,6 +49,7 @@
 import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -54,7 +57,7 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-public abstract class AbstractCloudIOManager extends IOManager implements IPartitionBootstrapper {
+public abstract class AbstractCloudIOManager extends IOManager implements IPartitionBootstrapper, ICloudIOManager {
     private static final Logger LOGGER = LogManager.getLogger();
     protected final ICloudClient cloudClient;
     protected final IWriteBufferProvider writeBufferProvider;
@@ -86,8 +89,8 @@
 
     @Override
     public IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() {
-        Set<String> existingMetadataFiles = getCloudMetadataPartitionFiles();
-        String bootstrapMarkerPath = StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver);
+        Set<CloudFile> existingMetadataFiles = getCloudMetadataPartitionFiles();
+        CloudFile bootstrapMarkerPath = CloudFile.of(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver));
         if (existingMetadataFiles.isEmpty() || existingMetadataFiles.contains(bootstrapMarkerPath)) {
             LOGGER.info("First time to initialize this cluster: systemState = PERMANENT_DATA_LOSS");
             return IRecoveryManager.SystemState.PERMANENT_DATA_LOSS;
@@ -122,6 +125,7 @@
             deleteUnkeptPartitionDirs(currentOnDiskPartitions);
             cleanupLocalFiles();
         }
+
         // Has different implementations depending on the caching policy
         downloadPartitions(metadataNode, metadataPartition);
     }
@@ -138,7 +142,7 @@
     }
 
     private void cleanupLocalFiles() throws HyracksDataException {
-        Set<String> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER);
+        Set<CloudFile> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER);
         if (cloudFiles.isEmpty()) {
             LOGGER.warn("No files in the cloud. Deleting all local files in partitions {}...", partitions);
             for (FileReference partitionPath : partitionPaths) {
@@ -159,6 +163,53 @@
 
     /*
      * ******************************************************************
+     * ICloudIOManager functions
+     * ******************************************************************
+     */
+
+    @Override
+    public final void cloudRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        cloudClient.read(bucket, fHandle.getFileReference().getRelativePath(), offset, data);
+    }
+
+    @Override
+    public final InputStream cloudRead(IFileHandle fHandle, long offset, long length) {
+        return cloudClient.getObjectStream(bucket, fHandle.getFileReference().getRelativePath(), offset, length);
+    }
+
+    @Override
+    public final int localWriter(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        return localIoManager.doSyncWrite(fHandle, offset, data);
+    }
+
+    @Override
+    public final int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws HyracksDataException {
+        ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
+        int writtenBytes;
+        try {
+            writtenBytes = cloudWriter.write(data);
+        } catch (HyracksDataException e) {
+            cloudWriter.abort();
+            throw e;
+        }
+        return writtenBytes;
+    }
+
+    @Override
+    public final long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws HyracksDataException {
+        ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
+        int writtenBytes;
+        try {
+            writtenBytes = cloudWriter.write(data[0], data[1]);
+        } catch (HyracksDataException e) {
+            cloudWriter.abort();
+            throw e;
+        }
+        return writtenBytes;
+    }
+
+    /*
+     * ******************************************************************
      * IIOManager functions
      * ******************************************************************
      */
@@ -166,7 +217,7 @@
     @Override
     public final IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
             throws HyracksDataException {
-        ICloudWriter cloudWriter = cloudClient.createdWriter(bucket, fileRef.getRelativePath(), writeBufferProvider);
+        ICloudWriter cloudWriter = cloudClient.createWriter(bucket, fileRef.getRelativePath(), writeBufferProvider);
         CloudFileHandle fHandle = new CloudFileHandle(fileRef, cloudWriter);
         onOpen(fHandle);
         try {
@@ -188,26 +239,14 @@
     public final long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray)
             throws HyracksDataException {
         long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray);
-        ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
-        try {
-            cloudWriter.write(dataArray[0], dataArray[1]);
-        } catch (HyracksDataException e) {
-            cloudWriter.abort();
-            throw e;
-        }
+        cloudWrite(fHandle, dataArray);
         return writtenBytes;
     }
 
     @Override
     public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer dataArray) throws HyracksDataException {
         int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray);
-        ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
-        try {
-            cloudWriter.write(dataArray);
-        } catch (HyracksDataException e) {
-            cloudWriter.abort();
-            throw e;
-        }
+        cloudWrite(fHandle, dataArray);
         return writtenBytes;
     }
 
@@ -294,24 +333,24 @@
         cloudClient.write(bucket, key, bytes);
     }
 
-    private Set<String> getCloudMetadataPartitionFiles() {
+    private Set<CloudFile> getCloudMetadataPartitionFiles() {
         String metadataNamespacePath = StoragePathUtil.getNamespacePath(nsPathResolver,
                 MetadataConstants.METADATA_NAMESPACE, METADATA_PARTITION);
         return cloudClient.listObjects(bucket, metadataNamespacePath, IoUtil.NO_OP_FILTER);
     }
 
     private void ensureCompleteMetadataBootstrap() throws HyracksDataException {
-        Set<String> metadataPartitionFiles = getCloudMetadataPartitionFiles();
-        boolean foundBootstrapMarker =
-                metadataPartitionFiles.contains(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver));
+        Set<CloudFile> metadataPartitionFiles = getCloudMetadataPartitionFiles();
+        CloudFile marker = CloudFile.of(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver));
+        boolean foundBootstrapMarker = metadataPartitionFiles.contains(marker);
         // if the bootstrap file exists, we failed to bootstrap --> delete all partial files in metadata partition
         if (foundBootstrapMarker) {
             LOGGER.info(
                     "detected failed bootstrap attempted, deleting all existing files in the metadata partition: {}",
                     metadataPartitionFiles);
             IIOBulkOperation deleteBulkOperation = createDeleteBulkOperation();
-            for (String file : metadataPartitionFiles) {
-                deleteBulkOperation.add(resolve(file));
+            for (CloudFile file : metadataPartitionFiles) {
+                deleteBulkOperation.add(resolve(file.getPath()));
             }
             performBulkOperation(deleteBulkOperation);
         }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 0b4200c..04979e6 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
@@ -94,4 +95,14 @@
         cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
         localIoManager.overwrite(fileRef, bytes);
     }
+
+    @Override
+    public int punchHole(IFileHandle fHandle, long offset, long length) {
+        throw new UnsupportedOperationException("punchHole is not supported with Eager caching");
+    }
+
+    @Override
+    public void evict(FileReference directory) {
+        throw new UnsupportedOperationException("evict is not supported with Eager caching");
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
index 693b73a..6a0a023 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
@@ -20,8 +20,19 @@
 
 import java.nio.ByteBuffer;
 
+/**
+ * Buffer provider and recycler
+ */
 public interface IWriteBufferProvider {
+    /**
+     * @return a buffer
+     */
     ByteBuffer getBuffer();
 
+    /**
+     * Return the buffer to reused
+     *
+     * @param buffer to recycle
+     */
     void recycle(ByteBuffer buffer);
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index cb47d00..fa4cd56 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.cloud;
 
-import static org.apache.asterix.cloud.lazy.ParallelCacher.METADATA_FILTER;
+import static org.apache.asterix.cloud.util.CloudFileUtil.METADATA_FILTER;
 import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
 import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 
@@ -32,6 +32,7 @@
 import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
+import org.apache.asterix.cloud.clients.CloudFile;
 import org.apache.asterix.cloud.clients.IParallelDownloader;
 import org.apache.asterix.cloud.lazy.ParallelCacher;
 import org.apache.asterix.cloud.lazy.accessor.ILazyAccessor;
@@ -44,6 +45,7 @@
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.util.IoUtil;
@@ -85,17 +87,17 @@
     protected synchronized void downloadPartitions(boolean metadataNode, int metadataPartition)
             throws HyracksDataException {
         // Get the files in all relevant partitions from the cloud
-        Set<String> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER).stream()
-                .filter(f -> partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(f)))
+        Set<CloudFile> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER).stream()
+                .filter(f -> partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(f.getPath())))
                 .collect(Collectors.toSet());
 
         // Get all files stored locally
-        Set<String> localFiles = new HashSet<>();
+        Set<CloudFile> localFiles = new HashSet<>();
         for (IODeviceHandle deviceHandle : getIODevices()) {
             FileReference storageRoot = deviceHandle.createFileRef(STORAGE_ROOT_DIR_NAME);
             Set<FileReference> deviceFiles = localIoManager.list(storageRoot, IoUtil.NO_OP_FILTER);
             for (FileReference fileReference : deviceFiles) {
-                localFiles.add(fileReference.getRelativePath());
+                localFiles.add(CloudFile.of(fileReference.getRelativePath()));
             }
         }
 
@@ -113,7 +115,7 @@
             // Download all metadata files to avoid (List) calls to the cloud when listing/reading these files
             downloadMetadataFiles(downloader, uncachedFiles);
             // Create a parallel cacher which download and monitor all uncached files
-            ParallelCacher cacher = new ParallelCacher(downloader, uncachedFiles);
+            ParallelCacher cacher = new ParallelCacher(downloader, uncachedFiles, true);
             // Local cache misses some files, cloud-based accessor is needed for read operations
             accessor = new ReplaceableCloudAccessor(cloudClient, bucket, localIoManager, partitions, replacer, cacher);
         } else {
@@ -183,14 +185,31 @@
         log("WRITE", fileRef);
     }
 
-    private List<FileReference> resolve(Set<String> cloudFiles) throws HyracksDataException {
+    @Override
+    public int punchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
+        // TODO implement for Selective accessor
+        return -1;
+    }
+
+    @Override
+    public void evict(FileReference directory) throws HyracksDataException {
+        // TODO implement for Selective accessor
+    }
+
+    private List<FileReference> resolve(Set<CloudFile> cloudFiles) throws HyracksDataException {
         List<FileReference> fileReferences = new ArrayList<>();
-        for (String file : cloudFiles) {
+        for (CloudFile file : cloudFiles) {
             fileReferences.add(resolve(file));
         }
         return fileReferences;
     }
 
+    private FileReference resolve(CloudFile file) throws HyracksDataException {
+        String path = file.getPath();
+        IODeviceHandle devHandle = getDeviceComputer().resolve(path, getIODevices());
+        return new UncachedFileReference(devHandle, path, file.getSize());
+    }
+
     private void log(String op, FileReference fileReference) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("{} {}", op, fileReference.getRelativePath());
@@ -199,7 +218,7 @@
 
     private void downloadMetadataFiles(IParallelDownloader downloader, List<FileReference> uncachedFiles)
             throws HyracksDataException {
-        Set<FileReference> uncachedMetadataFiles = ParallelCacher.getFiles(uncachedFiles, METADATA_FILTER);
+        Set<FileReference> uncachedMetadataFiles = ParallelCacher.getFiles(uncachedFiles, METADATA_FILTER).keySet();
         if (!uncachedMetadataFiles.isEmpty()) {
             LOGGER.debug("Downloading metadata files for all partitions; current uncached files: {}", uncachedFiles);
             downloader.downloadFiles(uncachedMetadataFiles);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/UncachedFileReference.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/UncachedFileReference.java
new file mode 100644
index 0000000..1f1891d
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/UncachedFileReference.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IODeviceHandle;
+
+public class UncachedFileReference extends FileReference {
+    private static final long serialVersionUID = -920468777697935759L;
+    private final long size;
+
+    public UncachedFileReference(IODeviceHandle dev, String path, long size) {
+        super(dev, path);
+        this.size = size;
+    }
+
+    private UncachedFileReference(FileReference fileReference) {
+        this(fileReference.getDeviceHandle(), fileReference.getRelativePath(), fileReference.getFile().length());
+    }
+
+    public long getSize() {
+        return size;
+    }
+
+    /**
+     * Convert a collection of {@link FileReference} to a collection of {@link UncachedFileReference}
+     * NOTE: Requires files to exist in the local drive
+     *
+     * @param files Collection of {@link FileReference} to convert to {@link UncachedFileReference}
+     * @return converted collection of file references
+     */
+    public static Collection<FileReference> toUncached(Collection<FileReference> files) {
+        List<FileReference> uncached = new ArrayList<>();
+        for (FileReference fileReference : files) {
+            if (!fileReference.getFile().exists()) {
+                throw new IllegalStateException(fileReference.getAbsolutePath() + " does not exist");
+            }
+            uncached.add(new UncachedFileReference(fileReference));
+        }
+
+        return uncached;
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudFile.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudFile.java
new file mode 100644
index 0000000..e1cf135
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudFile.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+public final class CloudFile {
+    private final String path;
+    private final long size;
+
+    private CloudFile(String path, long size) {
+        this.path = path;
+        this.size = size;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public long getSize() {
+        return size;
+    }
+
+    @Override
+    public int hashCode() {
+        return path.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof CloudFile)) {
+            return false;
+        }
+
+        CloudFile other = (CloudFile) obj;
+        return path.equals(other.path);
+    }
+
+    @Override
+    public String toString() {
+        return path;
+    }
+
+    public static CloudFile of(String path, long size) {
+        return new CloudFile(path, size);
+    }
+
+    public static CloudFile of(String path) {
+        return new CloudFile(path, 0);
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index 0307846..fb50dc9 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -49,7 +49,7 @@
      * @param bufferProvider buffer provider
      * @return cloud writer
      */
-    ICloudWriter createdWriter(String bucket, String path, IWriteBufferProvider bufferProvider);
+    ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider);
 
     /**
      * Lists objects at the specified bucket and path, and applies the file name filter on the returned objects
@@ -59,7 +59,7 @@
      * @param filter filter to apply
      * @return file names returned after applying the file name filter
      */
-    Set<String> listObjects(String bucket, String path, FilenameFilter filter);
+    Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter);
 
     /**
      * Performs a range-read from the specified bucket and path starting at the offset. The amount read is equal to the
@@ -88,9 +88,11 @@
      *
      * @param bucket bucket
      * @param path   path
-     * @return inputstream
+     * @param offset offset
+     * @param length length
+     * @return input stream of requested range
      */
-    InputStream getObjectStream(String bucket, String path);
+    InputStream getObjectStream(String bucket, String path, long offset, long length);
 
     /**
      * Writes the content of the byte array into the bucket at the specified path
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 9c31e17..38a6801 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -37,6 +37,7 @@
 
 import org.apache.asterix.cloud.CloudResettableInputStream;
 import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.clients.CloudFile;
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.clients.ICloudWriter;
@@ -98,13 +99,13 @@
     }
 
     @Override
-    public ICloudWriter createdWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
+    public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
         ICloudBufferedWriter bufferedWriter = new S3BufferedWriter(s3Client, profiler, bucket, path);
         return new CloudResettableInputStream(bufferedWriter, bufferProvider);
     }
 
     @Override
-    public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+    public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
         profiler.objectsList();
         path = config.isLocalS3Provider() ? encodeURI(path) : path;
         return filterAndGet(listS3Objects(s3Client, bucket, path), filter);
@@ -152,9 +153,11 @@
     }
 
     @Override
-    public InputStream getObjectStream(String bucket, String path) {
+    public InputStream getObjectStream(String bucket, String path, long offset, long length) {
         profiler.objectGet();
-        GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(path).build();
+        long readTo = offset + length;
+        GetObjectRequest getReq =
+                GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
         try {
             return s3Client.getObject(getReq);
         } catch (NoSuchKeyException e) {
@@ -167,8 +170,6 @@
     public void write(String bucket, String path, byte[] data) {
         profiler.objectWrite();
         PutObjectRequest putReq = PutObjectRequest.builder().bucket(bucket).key(path).build();
-
-        // TODO(htowaileb): add retry logic here
         s3Client.putObject(putReq, RequestBody.fromBytes(data));
     }
 
@@ -281,12 +282,12 @@
         return builder.build();
     }
 
-    private Set<String> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
-        Set<String> files = new HashSet<>();
+    private Set<CloudFile> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
+        Set<CloudFile> files = new HashSet<>();
         for (S3Object s3Object : contents) {
             String path = config.isLocalS3Provider() ? S3ClientUtils.decodeURI(s3Object.key()) : s3Object.key();
             if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
-                files.add(path);
+                files.add(CloudFile.of(path, s3Object.size()));
             }
         }
         return files;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index c725ca5..9eb3fe0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -33,6 +33,7 @@
 import java.util.Set;
 
 import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.clients.CloudFile;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.cloud.clients.IParallelDownloader;
@@ -86,20 +87,20 @@
     }
 
     @Override
-    public ICloudWriter createdWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
+    public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
         return new GCSWriter(bucket, path, gcsClient, profiler);
     }
 
     @Override
-    public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+    public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
         profiler.objectsList();
         Page<Blob> blobs =
                 gcsClient.list(bucket, BlobListOption.prefix(path), BlobListOption.fields(Storage.BlobField.SIZE));
 
-        Set<String> files = new HashSet<>();
+        Set<CloudFile> files = new HashSet<>();
         for (Blob blob : blobs.iterateAll()) {
             if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) {
-                files.add(blob.getName());
+                files.add(CloudFile.of(blob.getName(), blob.getSize()));
             }
         }
         return files;
@@ -138,12 +139,13 @@
     }
 
     @Override
-    public InputStream getObjectStream(String bucket, String path) {
+    public InputStream getObjectStream(String bucket, String path, long offset, long length) {
         profiler.objectGet();
-        try (ReadChannel reader = gcsClient.reader(bucket, path)) {
+        try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset + length)) {
+            reader.seek(offset);
             return Channels.newInputStream(reader);
-        } catch (StorageException ex) {
-            throw new IllegalStateException(ex);
+        } catch (StorageException | IOException e) {
+            throw new IllegalStateException(e);
         }
     }
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
index e3d978d..aebeab2 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.cloud.lazy;
 
+import java.io.File;
 import java.io.FilenameFilter;
 import java.util.Collection;
 import java.util.Set;
@@ -27,23 +28,51 @@
 
 public interface IParallelCacher {
     /**
-     * Check whether an index file data and metadata are already cached
+     * Check whether a file is cacheable or not
      *
-     * @param indexDir index directory
-     * @return true if the index is already cached, false otherwise
+     * @param fileReference file
+     * @return true if the file is already cached, false otherwise
      */
-    boolean isCached(FileReference indexDir);
+    boolean isCacheable(FileReference fileReference);
 
+    /**
+     * Returns a list of all uncached files
+     *
+     * @param dir    directory to list
+     * @param filter file name filter
+     * @return set of uncached files
+     */
     Set<FileReference> getUncachedFiles(FileReference dir, FilenameFilter filter);
 
     /**
+     * Returns the size of a file
+     *
+     * @param fileReference file
+     * @return the size of the file if it exists or zero otherwise (as expected when calling {@link File#length()})
+     */
+    long getSize(FileReference fileReference);
+
+    /**
+     * @return total size of uncached files
+     */
+    long getUncachedTotalSize();
+
+    /**
+     * Creates empty data files
+     *
+     * @param indexFile a file reference in an index
+     * @return true if the remaining number of uncached files is zero, false otherwise
+     */
+    boolean createEmptyDataFiles(FileReference indexFile) throws HyracksDataException;
+
+    /**
      * Downloads all index's data files for all partitions.
      * The index is inferred from the path of the provided file.
      *
      * @param indexFile a file reference in an index
      * @return true if the remaining number of uncached files is zero, false otherwise
      */
-    boolean downloadData(FileReference indexFile) throws HyracksDataException;
+    boolean downloadDataFiles(FileReference indexFile) throws HyracksDataException;
 
     /**
      * Downloads all index's metadata files for all partitions.
@@ -52,7 +81,7 @@
      * @param indexFile a file reference in an index
      * @return true if the remaining number of uncached files is zero, false otherwise
      */
-    boolean downloadMetadata(FileReference indexFile) throws HyracksDataException;
+    boolean downloadMetadataFiles(FileReference indexFile) throws HyracksDataException;
 
     /**
      * Remove the deleted files from the uncached file set
@@ -63,12 +92,19 @@
     boolean remove(Collection<FileReference> deletedFiles);
 
     /**
-     * Remove the deleted file from the uncached file set
+     * Remove a file from the uncached file set
      *
-     * @param deletedFile the deleted file
+     * @param fileReference the deleted file
      * @return true if the remaining number of uncached files is zero, false otherwise
      */
-    boolean remove(FileReference deletedFile);
+    boolean remove(FileReference fileReference);
+
+    /**
+     * Add files to indicated that they are not cached anymore
+     *
+     * @param files to be uncached
+     */
+    void add(Collection<FileReference> files);
 
     /**
      * Close cacher resources
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
index a77652c..c953de5 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
@@ -23,13 +23,14 @@
 import java.util.Collections;
 import java.util.Set;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 
 public class NoOpParallelCacher implements IParallelCacher {
     public static final IParallelCacher INSTANCE = new NoOpParallelCacher();
 
     @Override
-    public boolean isCached(FileReference indexDir) {
+    public boolean isCacheable(FileReference fileReference) {
         return false;
     }
 
@@ -39,12 +40,27 @@
     }
 
     @Override
-    public boolean downloadData(FileReference indexFile) {
+    public long getSize(FileReference fileReference) {
+        return 0L;
+    }
+
+    @Override
+    public long getUncachedTotalSize() {
+        return 0L;
+    }
+
+    @Override
+    public boolean createEmptyDataFiles(FileReference indexFile) throws HyracksDataException {
         return false;
     }
 
     @Override
-    public boolean downloadMetadata(FileReference indexFile) {
+    public boolean downloadDataFiles(FileReference indexFile) {
+        return false;
+    }
+
+    @Override
+    public boolean downloadMetadataFiles(FileReference indexFile) {
         return false;
     }
 
@@ -54,11 +70,16 @@
     }
 
     @Override
-    public boolean remove(FileReference deletedFile) {
+    public boolean remove(FileReference fileReference) {
         return false;
     }
 
     @Override
+    public void add(Collection<FileReference> files) {
+        // NoOp
+    }
+
+    @Override
     public void close() {
         // NoOp
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index 95330d9..bd6644c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -18,21 +18,28 @@
  */
 package org.apache.asterix.cloud.lazy;
 
-import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
+import static org.apache.asterix.cloud.util.CloudFileUtil.DATA_FILTER;
+import static org.apache.asterix.cloud.util.CloudFileUtil.METADATA_FILTER;
 
 import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import org.apache.asterix.cloud.UncachedFileReference;
 import org.apache.asterix.cloud.clients.IParallelDownloader;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -42,95 +49,108 @@
  * @see org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor
  */
 public final class ParallelCacher implements IParallelCacher {
-
-    public static final FilenameFilter METADATA_FILTER =
-            ((dir, name) -> name.startsWith(StorageConstants.INDEX_NON_DATA_FILES_PREFIX));
     private static final Logger LOGGER = LogManager.getLogger();
     private final IParallelDownloader downloader;
     /**
      * Uncached Indexes subpaths
      */
     private final Set<String> uncachedIndexes;
+    private final boolean checkEmpty;
 
     /**
      * All uncached data files
      * Example: BTree files
      */
-    private final Set<FileReference> uncachedDataFiles;
+    private final Map<FileReference, UncachedFileReference> uncachedDataFiles;
 
     /**
      * All uncached metadata files
      * Example: index checkpoint files
      */
-    private final Set<FileReference> uncachedMetadataFiles;
+    private final Map<FileReference, UncachedFileReference> uncachedMetadataFiles;
 
-    public ParallelCacher(IParallelDownloader downloader, List<FileReference> uncachedFiles) {
+    public ParallelCacher(IParallelDownloader downloader, List<FileReference> uncachedFiles, boolean checkEmpty) {
         this.downloader = downloader;
-        uncachedDataFiles = getFiles(uncachedFiles, COMPONENT_FILES_FILTER);
-        uncachedMetadataFiles = getFiles(uncachedFiles, METADATA_FILTER);
+        uncachedDataFiles = new ConcurrentHashMap<>(getFiles(uncachedFiles, DATA_FILTER));
+        uncachedMetadataFiles = new ConcurrentHashMap<>(getFiles(uncachedFiles, METADATA_FILTER));
         uncachedIndexes = getUncachedIndexes(uncachedFiles);
+        this.checkEmpty = checkEmpty;
     }
 
     @Override
-    public boolean isCached(FileReference indexDir) {
-        String relativePath = indexDir.getRelativePath();
-        if (relativePath.endsWith(StorageConstants.STORAGE_ROOT_DIR_NAME)
-                || relativePath.startsWith(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME)) {
-            return false;
+    public boolean isCacheable(FileReference fileReference) {
+        if (isDataFile(fileReference)) {
+            return uncachedDataFiles.containsKey(fileReference);
+        } else {
+            return uncachedMetadataFiles.containsKey(fileReference);
         }
-        String indexSubPath = StoragePathUtil.getIndexSubPath(indexDir, true);
-        return !indexSubPath.isEmpty() && !uncachedIndexes.contains(indexSubPath);
     }
 
     @Override
     public Set<FileReference> getUncachedFiles(FileReference dir, FilenameFilter filter) {
         if (dir.getRelativePath().endsWith(StorageConstants.STORAGE_ROOT_DIR_NAME)) {
-            return uncachedDataFiles.stream()
+            return uncachedDataFiles.keySet().stream()
                     .filter(f -> StoragePathUtil.hasSameStorageRoot(dir, f) && filter.accept(null, f.getName()))
                     .collect(Collectors.toSet());
         }
-        return uncachedDataFiles
+        return uncachedDataFiles.keySet()
                 .stream().filter(f -> StoragePathUtil.hasSameStorageRoot(dir, f)
                         && StoragePathUtil.isRelativeParent(dir, f) && filter.accept(null, f.getName()))
                 .collect(Collectors.toSet());
     }
 
     @Override
-    public synchronized boolean downloadData(FileReference indexFile) throws HyracksDataException {
-        String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
-        Set<FileReference> toDownload = new HashSet<>();
-        for (FileReference fileReference : uncachedDataFiles) {
-            if (fileReference.getRelativePath().contains(indexSubPath)) {
-                toDownload.add(fileReference.getParent());
-            }
+    public long getSize(FileReference fileReference) {
+        UncachedFileReference uncachedFile;
+        if (isDataFile(fileReference)) {
+            uncachedFile = uncachedDataFiles.get(fileReference);
+        } else {
+            uncachedFile = uncachedMetadataFiles.get(fileReference);
         }
 
-        LOGGER.debug("Downloading data files for {} in all partitions: {}", indexSubPath, toDownload);
-        Collection<FileReference> failed = downloader.downloadDirectories(toDownload);
-        if (!failed.isEmpty()) {
-            LOGGER.warn("Failed to download data files {}. Re-downloading: {}", indexSubPath, failed);
-            downloader.downloadFiles(failed);
-        }
-        LOGGER.debug("Finished downloading data files for {}", indexSubPath);
+        return uncachedFile == null ? 0L : uncachedFile.getSize();
+    }
+
+    @Override
+    public long getUncachedTotalSize() {
+        return getTotalSize(uncachedMetadataFiles.values()) + getTotalSize(uncachedDataFiles.values());
+    }
+
+    @Override
+    public synchronized boolean createEmptyDataFiles(FileReference indexFile) throws HyracksDataException {
+        String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
+        Set<FileReference> toCreate = getForAllPartitions(uncachedDataFiles.values(), indexSubPath);
+
+        LOGGER.debug("Creating empty data files for {} in all partitions: {}", indexSubPath, toCreate);
+        createEmptyFiles(toCreate);
+        LOGGER.debug("Finished creating data files for {}", indexSubPath);
         uncachedIndexes.remove(indexSubPath);
-        uncachedDataFiles.removeIf(f -> f.getRelativePath().contains(indexSubPath));
+        uncachedDataFiles.keySet().removeIf(f -> f.getRelativePath().contains(indexSubPath));
         return isEmpty();
     }
 
     @Override
-    public synchronized boolean downloadMetadata(FileReference indexFile) throws HyracksDataException {
+    public synchronized boolean downloadDataFiles(FileReference indexFile) throws HyracksDataException {
         String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
-        Set<FileReference> toDownload = new HashSet<>();
-        for (FileReference fileReference : uncachedMetadataFiles) {
-            if (fileReference.getRelativePath().contains(indexSubPath)) {
-                toDownload.add(fileReference);
-            }
-        }
+        Set<FileReference> toDownload = getForAllPartitions(uncachedDataFiles.values(), indexSubPath);
+
+        LOGGER.debug("Downloading data files for {} in all partitions: {}", indexSubPath, toDownload);
+        downloader.downloadFiles(toDownload);
+        LOGGER.debug("Finished downloading data files for {}", indexSubPath);
+        uncachedIndexes.remove(indexSubPath);
+        uncachedDataFiles.keySet().removeIf(f -> f.getRelativePath().contains(indexSubPath));
+        return isEmpty();
+    }
+
+    @Override
+    public synchronized boolean downloadMetadataFiles(FileReference indexFile) throws HyracksDataException {
+        String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
+        Set<FileReference> toDownload = getForAllPartitions(uncachedMetadataFiles.keySet(), indexSubPath);
 
         LOGGER.debug("Downloading metadata files for {} in all partitions: {}", indexSubPath, toDownload);
         downloader.downloadFiles(toDownload);
         LOGGER.debug("Finished downloading metadata files for {}", indexSubPath);
-        uncachedMetadataFiles.removeAll(toDownload);
+        uncachedMetadataFiles.keySet().removeAll(toDownload);
         return isEmpty();
     }
 
@@ -145,28 +165,36 @@
     }
 
     @Override
-    public boolean remove(FileReference deletedFile) {
-        LOGGER.info("Deleting {}", deletedFile);
-        if (COMPONENT_FILES_FILTER.accept(null, deletedFile.getName())) {
-            uncachedDataFiles.remove(deletedFile);
+    public boolean remove(FileReference fileReference) {
+        LOGGER.info("Deleting {}", fileReference);
+        if (isDataFile(fileReference)) {
+            uncachedDataFiles.remove(fileReference);
         } else {
-            uncachedMetadataFiles.remove(deletedFile);
+            uncachedMetadataFiles.remove(fileReference);
         }
 
         return isEmpty();
     }
 
     @Override
+    public synchronized void add(Collection<FileReference> files) {
+        LOGGER.info("Uncache {}", files);
+        uncachedDataFiles.putAll(getFiles(files, DATA_FILTER));
+        uncachedMetadataFiles.putAll(getFiles(files, METADATA_FILTER));
+    }
+
+    @Override
     public void close() throws HyracksDataException {
         downloader.close();
         LOGGER.info("Parallel cacher was closed");
     }
 
-    public static Set<FileReference> getFiles(List<FileReference> uncachedFiles, FilenameFilter filter) {
-        Set<FileReference> fileReferences = ConcurrentHashMap.newKeySet();
+    public static Map<FileReference, UncachedFileReference> getFiles(Collection<FileReference> uncachedFiles,
+            FilenameFilter filter) {
+        Map<FileReference, UncachedFileReference> fileReferences = new HashMap<>();
         for (FileReference fileReference : uncachedFiles) {
             if (filter.accept(null, fileReference.getName())) {
-                fileReferences.add(fileReference);
+                fileReferences.put(fileReference, (UncachedFileReference) fileReference);
             }
         }
         return fileReferences;
@@ -180,9 +208,48 @@
         return uncachedIndexes;
     }
 
+    private boolean isDataFile(FileReference fileReference) {
+        return DATA_FILTER.accept(null, fileReference.getName());
+    }
+
     private synchronized boolean isEmpty() {
+        if (!checkEmpty) {
+            return false;
+        }
         int totalSize = uncachedDataFiles.size() + uncachedMetadataFiles.size();
         LOGGER.info("Current number of uncached files {}", totalSize);
         return totalSize == 0;
     }
+
+    private static Set<FileReference> getForAllPartitions(Collection<? extends FileReference> uncachedFiles,
+            String indexSubPath) {
+        Set<FileReference> allFiles = new HashSet<>();
+        for (FileReference fileReference : uncachedFiles) {
+            if (fileReference.getRelativePath().contains(indexSubPath)) {
+                allFiles.add(fileReference);
+            }
+        }
+
+        return allFiles;
+    }
+
+    private static long getTotalSize(Collection<UncachedFileReference> fileReferences) {
+        long size = 0L;
+        for (UncachedFileReference uncached : fileReferences) {
+            size += uncached.getSize();
+        }
+        return size;
+    }
+
+    private static void createEmptyFiles(Set<FileReference> toCreate) throws HyracksDataException {
+        for (FileReference fileReference : toCreate) {
+            IoUtil.create(fileReference);
+            try (RandomAccessFile raf = new RandomAccessFile(fileReference.getAbsolutePath(), "rw")) {
+                raf.setLength(((UncachedFileReference) fileReference).getSize());
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
index 534ff5d..48f2ec7 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -26,22 +26,72 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 
+/**
+ * An abstraction for lazy I/O operations
+ */
 public interface ILazyAccessor {
+    /**
+     * @return whether this is local accessor
+     */
     boolean isLocalAccessor();
 
+    /**
+     * @return a callback for bulk operation
+     */
     IBulkOperationCallBack getBulkOperationCallBack();
 
+    /**
+     * Notify opening a file
+     *
+     * @param fileHandle to open
+     */
     void doOnOpen(CloudFileHandle fileHandle) throws HyracksDataException;
 
+    /**
+     * List a directory
+     *
+     * @param dir    to list
+     * @param filter filter to return only specific file
+     * @return set of all files that in a directory and satisfy the filter
+     */
     Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws HyracksDataException;
 
+    /**
+     * Checks whether a file exits
+     *
+     * @param fileRef to check
+     * @return true if exists, false otherwise
+     */
     boolean doExists(FileReference fileRef) throws HyracksDataException;
 
+    /**
+     * Get a size of a file
+     *
+     * @param fileReference to get the size of
+     * @return size in bytes
+     */
     long doGetSize(FileReference fileReference) throws HyracksDataException;
 
+    /**
+     * Read all bytes of a file
+     *
+     * @param fileReference to read
+     * @return read bytes
+     */
     byte[] doReadAllBytes(FileReference fileReference) throws HyracksDataException;
 
+    /**
+     * Delete a file
+     *
+     * @param fileReference to delete
+     */
     void doDelete(FileReference fileReference) throws HyracksDataException;
 
+    /**
+     * Overwrite a file with the provided data
+     *
+     * @param fileReference to overwrite
+     * @param bytes         to be written
+     */
     void doOverwrite(FileReference fileReference, byte[] bytes) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
index 3a4ff8a..b560be4 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.cloud.lazy.accessor;
 
+@FunctionalInterface
 public interface ILazyAccessorReplacer {
+    /**
+     * Replace the {@link ILazyAccessor}
+     */
     void replace();
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index e4e168e..c532674 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -24,6 +24,7 @@
 
 import org.apache.asterix.cloud.CloudFileHandle;
 import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
+import org.apache.asterix.cloud.clients.CloudFile;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.lazy.IParallelCacher;
 import org.apache.asterix.common.utils.StorageConstants;
@@ -42,7 +43,7 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private final Set<Integer> partitions;
     private final ILazyAccessorReplacer replacer;
-    private final IParallelCacher cacher;
+    protected final IParallelCacher cacher;
     private final IBulkOperationCallBack deleteCallBack;
 
     public ReplaceableCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager,
@@ -72,7 +73,7 @@
     public void doOnOpen(CloudFileHandle fileHandle) throws HyracksDataException {
         FileReference fileRef = fileHandle.getFileReference();
         if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
-            if (cacher.downloadData(fileRef)) {
+            if (cacher.downloadDataFiles(fileRef)) {
                 replace();
             }
         }
@@ -89,14 +90,9 @@
         return localList;
     }
 
-    private static boolean isTxnDir(FileReference dir) {
-        return dir.getRelativePath().startsWith(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME)
-                || dir.getName().equals(StorageConstants.GLOBAL_TXN_DIR_NAME);
-    }
-
     @Override
     public boolean doExists(FileReference fileRef) throws HyracksDataException {
-        return localIoManager.exists(fileRef) || cloudClient.exists(bucket, fileRef.getRelativePath());
+        return localIoManager.exists(fileRef) || cacher.isCacheable(fileRef);
     }
 
     @Override
@@ -104,13 +100,13 @@
         if (localIoManager.exists(fileReference)) {
             return localIoManager.getSize(fileReference);
         }
-        return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
+        return cacher.getSize(fileReference);
     }
 
     @Override
     public byte[] doReadAllBytes(FileReference fileRef) throws HyracksDataException {
         if (!localIoManager.exists(fileRef) && isInNodePartition(fileRef.getRelativePath())) {
-            if (cacher.downloadMetadata(fileRef)) {
+            if (cacher.downloadMetadataFiles(fileRef)) {
                 replace();
             }
         }
@@ -140,7 +136,7 @@
 
     private Set<FileReference> cloudBackedList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
         LOGGER.debug("CLOUD LIST: {}", dir);
-        Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
+        Set<CloudFile> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
         if (cloudFiles.isEmpty()) {
             return Collections.emptySet();
         }
@@ -150,7 +146,7 @@
 
         // Reconcile local files and cloud files
         for (FileReference file : localFiles) {
-            String path = file.getRelativePath();
+            CloudFile path = CloudFile.of(file.getRelativePath());
             if (!cloudFiles.contains(path)) {
                 throw new IllegalStateException("Local file is not clean. Offending path: " + path);
             } else {
@@ -160,9 +156,9 @@
         }
 
         // Add the remaining files that are not stored locally in their designated partitions (if any)
-        for (String cloudFile : cloudFiles) {
-            FileReference localFile = localIoManager.resolve(cloudFile);
-            if (isInNodePartition(cloudFile) && StoragePathUtil.hasSameStorageRoot(dir, localFile)) {
+        for (CloudFile cloudFile : cloudFiles) {
+            FileReference localFile = localIoManager.resolve(cloudFile.getPath());
+            if (isInNodePartition(cloudFile.getPath()) && StoragePathUtil.hasSameStorageRoot(dir, localFile)) {
                 localFiles.add(localFile);
             }
         }
@@ -177,4 +173,9 @@
         cacher.close();
         replacer.replace();
     }
+
+    private static boolean isTxnDir(FileReference dir) {
+        return dir.getRelativePath().startsWith(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME)
+                || dir.getName().equals(StorageConstants.GLOBAL_TXN_DIR_NAME);
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
index 08cbf43..89ea4b1 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
@@ -18,22 +18,33 @@
  */
 package org.apache.asterix.cloud.util;
 
+import java.io.FilenameFilter;
 import java.util.Iterator;
 import java.util.Set;
 
+import org.apache.asterix.cloud.clients.CloudFile;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 public class CloudFileUtil {
     private static final Logger LOGGER = LogManager.getLogger();
 
+    // TODO Should we consider bloomfilter and LAF files as metadata files so that they are downloaded on bootstrap?
+    public static final FilenameFilter METADATA_FILTER =
+            ((dir, name) -> name.startsWith(StorageConstants.INDEX_NON_DATA_FILES_PREFIX)
+                    || name.endsWith(AbstractLSMIndexFileManager.LAF_SUFFIX)
+                    || name.endsWith(AbstractLSMIndexFileManager.BLOOM_FILTER_SUFFIX));
+    public static final FilenameFilter DATA_FILTER = ((dir, name) -> !METADATA_FILTER.accept(dir, name));
+
     private CloudFileUtil() {
     }
 
-    public static void cleanDirectoryFiles(IOManager ioManager, Set<String> cloudFiles, FileReference partitionPath)
+    public static void cleanDirectoryFiles(IOManager ioManager, Set<CloudFile> cloudFiles, FileReference partitionPath)
             throws HyracksDataException {
         // First get the set of local files
         Set<FileReference> localFiles = ioManager.list(partitionPath);
@@ -47,7 +58,7 @@
                 continue;
             }
 
-            String path = file.getRelativePath();
+            CloudFile path = CloudFile.of(file.getRelativePath());
             if (!cloudFiles.contains(path)) {
                 // Delete local files that do not exist in cloud storage (the ground truth for valid files)
                 logDeleteFile(file);
@@ -60,12 +71,13 @@
         }
 
         // Add the remaining files that are not stored locally (if any)
-        for (String cloudFile : cloudFiles) {
-            if (!cloudFile.contains(partitionPath.getRelativePath())) {
+        for (CloudFile cloudFile : cloudFiles) {
+            String cloudFilePath = cloudFile.getPath();
+            if (!cloudFilePath.contains(partitionPath.getRelativePath())) {
                 continue;
             }
             localFiles.add(new FileReference(partitionPath.getDeviceHandle(),
-                    cloudFile.substring(cloudFile.indexOf(partitionPath.getRelativePath()))));
+                    cloudFilePath.substring(cloudFilePath.indexOf(partitionPath.getRelativePath()))));
         }
     }
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
index 4277800..d02b34e 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -81,7 +81,7 @@
             return false;
         }
 
-        cloudWriter = cloudClient.createdWriter(bucket, fullPath, bufferProvider);
+        cloudWriter = cloudClient.createWriter(bucket, fullPath, bufferProvider);
         CloudOutputStream outputStream = new CloudOutputStream(cloudWriter);
         printer.newStream(outputStream);
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index 75c4ec5..d73957e 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -135,7 +135,7 @@
         byte[] data = new byte[Long.BYTES];
         LongPointable.setLong(data, 0, writeValue);
         IWriteBufferProvider bufferProvider = new WriterSingleBufferProvider(testClient.getWriteBufferSize());
-        ICloudWriter writer = testClient.createdWriter(bucket, path, bufferProvider);
+        ICloudWriter writer = testClient.createWriter(bucket, path, bufferProvider);
         boolean aborted = false;
         try {
             writer.write(data, 0, data.length);
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
index 484f372..e6b5534 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
@@ -55,7 +55,7 @@
     public void a1writeToS3Test() throws IOException {
         IWriteBufferProvider bufferProvider = new WriterSingleBufferProvider(CLOUD_CLIENT.getWriteBufferSize());
         ICloudWriter cloudWriter =
-                CLOUD_CLIENT.createdWriter(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT + "/0_b", bufferProvider);
+                CLOUD_CLIENT.createWriter(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT + "/0_b", bufferProvider);
 
         try {
             ByteBuffer content = createContent(BUFFER_SIZE);
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index a0e5443..cc75514 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -131,10 +131,18 @@
               <url>http://asm.objectweb.org/license.html</url>
             </override>
             <override>
+              <gav>asm:asm:3.1</gav>
+              <url>https://asm.ow2.io/license.html</url>
+            </override>
+            <override>
               <gav>com.thoughtworks.paranamer:paranamer:2.3</gav>
               <url>https://github.com/codehaus/paranamer-git/blob/paranamer-2.3/LICENSE.txt</url>
             </override>
             <override>
+              <gav>com.github.jnr:jnr-posix:3.1.19</gav>
+              <url>https://www.eclipse.org/legal/epl-2.0</url>
+            </override>
+            <override>
               <gav>org.codehaus.jettison:jettison:1.1</gav>
               <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
             </override>
diff --git a/asterixdb/src/main/licenses/content/asm.ow2.io_license.html.txt b/asterixdb/src/main/licenses/content/asm.ow2.io_license.html.txt
new file mode 100644
index 0000000..c5aba7b
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/asm.ow2.io_license.html.txt
@@ -0,0 +1,29 @@
+Copyright (c) 2000-2011 INRIA, France Telecom
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+1. Redistributions of source code must retain the above copyright
+   notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+   notice, this list of conditions and the following disclaimer in the
+   documentation and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holders nor the names of its
+   contributors may be used to endorse or promote products derived from
+   this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
index 69d33e8..c75e83a 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
@@ -36,13 +36,13 @@
     public int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data)
             throws HyracksDataException {
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
-        return cloudIOManager.cloudWrite(handle, offset, data);
+        return cloudIOManager.cloudWrite(handle, data);
     }
 
     @Override
     public long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data)
             throws HyracksDataException {
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
-        return cloudIOManager.cloudWrite(handle, offset, data);
+        return cloudIOManager.cloudWrite(handle, data);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 430aac7..83b7629 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -62,21 +62,19 @@
      * Write to cloud only
      *
      * @param fHandle file handle
-     * @param offset  starting offset
      * @param data    to write
      * @return number of written bytes
      */
-    int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+    int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws HyracksDataException;
 
     /**
      * Write to cloud only
      *
      * @param fHandle file handle
-     * @param offset  starting offset
      * @param data    to write
      * @return number of written bytes
      */
-    long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) throws HyracksDataException;
+    long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws HyracksDataException;
 
     /**
      * Punch a hole in a file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
index 3fb682c..b9d9c10 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.storage.common.compression.file;
 
-import java.util.Objects;
-
 import org.apache.hyracks.api.compression.ICompressorDecompressor;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IODeviceHandle;
@@ -57,19 +55,6 @@
         return true;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof CompressedFileReference)) {
-            return false;
-        }
-        return super.equals(o) && lafPath.equals(((CompressedFileReference) o).lafPath);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(super.getRelativePath(), lafPath);
-    }
-
     /**
      * @return the relative path for LAF file
      */