[ASTERIXDB-3218][*DB] Utilize local accessor when everything is cached

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
With the lazy caching policy, every I/O request (e.g., list) has
to be accompanied by a cloud storage request to reconcile any
differences between the local cache and what is stored in the
cloud. However, those requests are not necessary if both
the cloud and the local cache are in sync. This patch removes
all those unnecessary cloud requests once everything is cached.

Change-Id: I0c5c2c09ce62930cec109e465f4fcff01423889c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17640
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 49f71f4..3288444 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -26,11 +26,9 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
 import org.apache.asterix.cloud.clients.CloudClientProvider;
@@ -151,11 +149,6 @@
      */
 
     @Override
-    public boolean exists(FileReference fileRef) throws HyracksDataException {
-        return localIoManager.exists(fileRef) || cloudClient.exists(bucket, fileRef.getRelativePath());
-    }
-
-    @Override
     public final IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
             throws HyracksDataException {
         CloudFileHandle fHandle = new CloudFileHandle(cloudClient, bucket, fileRef, writeBufferProvider);
@@ -204,20 +197,6 @@
     }
 
     @Override
-    public final void delete(FileReference fileRef) throws HyracksDataException {
-        // Never delete the storage dir in cloud storage
-        if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
-            File localFile = fileRef.getFile();
-            // if file reference exists,and it is a file, then list is not required
-            Set<String> paths =
-                    localFile.exists() && localFile.isFile() ? Collections.singleton(fileRef.getRelativePath())
-                            : list(fileRef).stream().map(FileReference::getRelativePath).collect(Collectors.toSet());
-            cloudClient.deleteObjects(bucket, paths);
-        }
-        localIoManager.delete(fileRef);
-    }
-
-    @Override
     public IIOBulkOperation createDeleteBulkOperation() {
         return new DeleteBulkCloudOperation(localIoManager, bucket, cloudClient);
     }
@@ -259,13 +238,6 @@
     }
 
     @Override
-    public final void overwrite(FileReference fileRef, byte[] bytes) throws HyracksDataException {
-        // Write here will overwrite the older object if exists
-        cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
-        localIoManager.overwrite(fileRef, bytes);
-    }
-
-    @Override
     public final void create(FileReference fileRef) throws HyracksDataException {
         // We need to delete the local file on create as the cloud storage didn't complete the upload
         // In other words, both cloud files and the local files are not in sync
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 0c61957..f869f37 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -20,12 +20,17 @@
 
 import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 
+import java.io.File;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -37,7 +42,7 @@
  * OR
  * - {@link AbstractCloudIOManager}
  */
-class EagerCloudIOManager extends AbstractCloudIOManager {
+final class EagerCloudIOManager extends AbstractCloudIOManager {
     private static final Logger LOGGER = LogManager.getLogger();
 
     public EagerCloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
@@ -66,4 +71,30 @@
     protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode) {
         // NoOp
     }
+
+    @Override
+    public boolean exists(FileReference fileRef) throws HyracksDataException {
+        return localIoManager.exists(fileRef);
+    }
+
+    @Override
+    public void delete(FileReference fileRef) throws HyracksDataException {
+        // Never delete the storage dir in cloud storage
+        if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
+            File localFile = fileRef.getFile();
+            // if file reference exists,and it is a file, then list is not required
+            Set<String> paths =
+                    localFile.exists() && localFile.isFile() ? Collections.singleton(fileRef.getRelativePath())
+                            : list(fileRef).stream().map(FileReference::getRelativePath).collect(Collectors.toSet());
+            cloudClient.deleteObjects(bucket, paths);
+        }
+        localIoManager.delete(fileRef);
+    }
+
+    @Override
+    public void overwrite(FileReference fileRef, byte[] bytes) throws HyracksDataException {
+        // Write here will overwrite the older object if exists
+        cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
+        localIoManager.overwrite(fileRef, bytes);
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 2232dcd..0a83687 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -18,19 +18,24 @@
  */
 package org.apache.asterix.cloud;
 
-import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 
-import java.io.File;
 import java.io.FilenameFilter;
-import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
-import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.cloud.lazy.accessor.ILazyAccessor;
+import org.apache.asterix.cloud.lazy.accessor.ILazyAccessorReplacer;
+import org.apache.asterix.cloud.lazy.accessor.InitialCloudAccessor;
+import org.apache.asterix.cloud.lazy.accessor.LocalAccessor;
+import org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor;
 import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -38,12 +43,24 @@
 /**
  * CloudIOManager with lazy caching
  * - Overrides some of {@link IOManager} functions
+ * Note: once everything is cached, this will eventually be similar to {@link EagerCloudIOManager}
  */
-class LazyCloudIOManager extends AbstractCloudIOManager {
+final class LazyCloudIOManager extends AbstractCloudIOManager {
     private static final Logger LOGGER = LogManager.getLogger();
+    private final ILazyAccessorReplacer replacer;
+    private ILazyAccessor accessor;
 
     public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
         super(ioManager, cloudProperties);
+        accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager, writeBufferProvider);
+        replacer = () -> {
+            synchronized (this) {
+                if (!accessor.isLocalAccessor()) {
+                    LOGGER.warn("Replacing cloud-accessor to local-accessor");
+                    accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
+                }
+            }
+        };
     }
 
     /*
@@ -53,28 +70,40 @@
      */
 
     @Override
-    protected void downloadPartitions() {
-        // NoOp
+    protected void downloadPartitions() throws HyracksDataException {
+        // Get the files in all relevant partitions from the cloud
+        Set<String> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER).stream()
+                .filter(f -> partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(f)))
+                .collect(Collectors.toSet());
+
+        // Get all files stored locally
+        Set<String> localFiles = new HashSet<>();
+        for (IODeviceHandle deviceHandle : getIODevices()) {
+            FileReference storageRoot = deviceHandle.createFileRef(STORAGE_ROOT_DIR_NAME);
+            Set<FileReference> deviceFiles = localIoManager.list(storageRoot, IoUtil.NO_OP_FILTER);
+            for (FileReference fileReference : deviceFiles) {
+                localFiles.add(fileReference.getRelativePath());
+            }
+        }
+
+        // Keep uncached files list (i.e., files exists in cloud only)
+        cloudFiles.removeAll(localFiles);
+        int remainingUncachedFiles = cloudFiles.size();
+        if (remainingUncachedFiles > 0) {
+            // Local cache misses some files, cloud-based accessor is needed for read operations
+            accessor = new ReplaceableCloudAccessor(cloudClient, bucket, localIoManager, partitions,
+                    remainingUncachedFiles, writeBufferProvider, replacer);
+        } else {
+            // Everything is cached, no need to invoke cloud-based accessor for read operations
+            accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
+        }
+        LOGGER.info("The number of uncached files: {}", remainingUncachedFiles);
     }
 
     @Override
     protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode)
             throws HyracksDataException {
-        FileReference fileRef = fileHandle.getFileReference();
-        if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
-            // File doesn't exist locally, download it.
-            ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
-            try {
-                // TODO download for all partitions at once
-                LOGGER.info("Downloading {} from S3..", fileRef.getRelativePath());
-                CloudFileUtil.downloadFile(localIoManager, cloudClient, bucket, fileHandle, rwMode, syncMode,
-                        writeBuffer);
-                localIoManager.close(fileHandle);
-                LOGGER.info("Finished downloading {} from S3..", fileRef.getRelativePath());
-            } finally {
-                writeBufferProvider.recycle(writeBuffer);
-            }
-        }
+        accessor.doOnOpen(fileHandle, rwMode, syncMode);
     }
 
     /*
@@ -84,58 +113,31 @@
      */
     @Override
     public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
-        Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
-        if (cloudFiles.isEmpty()) {
-            return Collections.emptySet();
-        }
+        return accessor.doList(dir, filter);
+    }
 
-        // First get the set of local files
-        Set<FileReference> localFiles = localIoManager.list(dir, filter);
-
-        // Reconcile local files and cloud files
-        for (FileReference file : localFiles) {
-            String path = file.getRelativePath();
-            if (!cloudFiles.contains(path)) {
-                throw new IllegalStateException("Local file is not clean");
-            } else {
-                // No need to re-add it in the following loop
-                cloudFiles.remove(path);
-            }
-        }
-
-        // Add the remaining files that are not stored locally in their designated partitions (if any)
-        for (String cloudFile : cloudFiles) {
-            FileReference localFile = resolve(cloudFile);
-            if (isInNodePartition(cloudFile) && dir.getDeviceHandle().equals(localFile.getDeviceHandle())) {
-                localFiles.add(localFile);
-            }
-        }
-        return new HashSet<>(localFiles);
+    @Override
+    public boolean exists(FileReference fileRef) throws HyracksDataException {
+        return accessor.doExists(fileRef);
     }
 
     @Override
     public long getSize(FileReference fileReference) throws HyracksDataException {
-        if (localIoManager.exists(fileReference)) {
-            return localIoManager.getSize(fileReference);
-        }
-        return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
+        return accessor.doGetSize(fileReference);
     }
 
     @Override
     public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
-        if (!localIoManager.exists(fileRef) && isInNodePartition(fileRef.getRelativePath())) {
-            byte[] bytes = cloudClient.readAllBytes(bucket, fileRef.getRelativePath());
-            if (bytes != null && !partitions.isEmpty()) {
-                localIoManager.overwrite(fileRef, bytes);
-            }
-            return bytes;
-        }
-        return localIoManager.readAllBytes(fileRef);
+        return accessor.doReadAllBytes(fileRef);
     }
 
-    private boolean isInNodePartition(String path) {
-        int start = path.indexOf(PARTITION_DIR_PREFIX) + PARTITION_DIR_PREFIX.length();
-        int length = path.indexOf(File.separatorChar, start);
-        return partitions.contains(Integer.parseInt(path.substring(start, length)));
+    @Override
+    public void delete(FileReference fileRef) throws HyracksDataException {
+        accessor.doDelete(fileRef);
+    }
+
+    @Override
+    public void overwrite(FileReference fileRef, byte[] bytes) throws HyracksDataException {
+        accessor.doOverwrite(fileRef, bytes);
     }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
new file mode 100644
index 0000000..de7efc1
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+abstract class AbstractLazyAccessor implements ILazyAccessor {
+    protected final ICloudClient cloudClient;
+    protected final String bucket;
+    protected final IOManager localIoManager;
+
+    AbstractLazyAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager) {
+        this.cloudClient = cloudClient;
+        this.bucket = bucket;
+        this.localIoManager = localIoManager;
+    }
+
+    int doCloudDelete(FileReference fileReference) throws HyracksDataException {
+        int numberOfCloudDeletes = 0;
+        if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileReference.getAbsolutePath()))) {
+            File localFile = fileReference.getFile();
+            // if file reference exists,and it is a file, then list is not required
+            Set<String> paths =
+                    localFile.exists() && localFile.isFile() ? Collections.singleton(fileReference.getRelativePath())
+                            : doList(fileReference, IoUtil.NO_OP_FILTER).stream().map(FileReference::getRelativePath)
+                                    .collect(Collectors.toSet());
+            cloudClient.deleteObjects(bucket, paths);
+            numberOfCloudDeletes = paths.size();
+        }
+        return numberOfCloudDeletes;
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
new file mode 100644
index 0000000..efedd64
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.io.FilenameFilter;
+import java.util.Set;
+
+import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+public interface ILazyAccessor {
+    boolean isLocalAccessor();
+
+    void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode)
+            throws HyracksDataException;
+
+    Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws HyracksDataException;
+
+    boolean doExists(FileReference fileRef) throws HyracksDataException;
+
+    long doGetSize(FileReference fileReference) throws HyracksDataException;
+
+    byte[] doReadAllBytes(FileReference fileReference) throws HyracksDataException;
+
+    void doDelete(FileReference fileReference) throws HyracksDataException;
+
+    void doOverwrite(FileReference fileReference, byte[] bytes) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
new file mode 100644
index 0000000..3a4ff8a
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+public interface ILazyAccessorReplacer {
+    void replace();
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
new file mode 100644
index 0000000..b93da54
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.util.Collections;
+
+import org.apache.asterix.cloud.WriteBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+/**
+ * Initial accessor to allow {@link org.apache.asterix.common.transactions.IGlobalTransactionContext} to work before
+ * initializing the NC's partitions
+ */
+public class InitialCloudAccessor extends ReplaceableCloudAccessor {
+    private static final ILazyAccessorReplacer NO_OP_REPLACER = () -> {
+    };
+
+    public InitialCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager,
+            WriteBufferProvider writeBufferProvider) {
+        super(cloudClient, bucket, localIoManager, Collections.emptySet(), 0, writeBufferProvider, NO_OP_REPLACER);
+    }
+
+    @Override
+    protected void decrementNumberOfUncachedFiles() {
+        // No Op
+    }
+
+    @Override
+    protected void decrementNumberOfUncachedFiles(int count) {
+        // No Op
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
new file mode 100644
index 0000000..6c480ec
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.io.FilenameFilter;
+import java.util.Set;
+
+import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+/**
+ * LocalAccessor would be used once everything in the cloud storage is cached locally
+ */
+public class LocalAccessor extends AbstractLazyAccessor {
+
+    public LocalAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager) {
+        super(cloudClient, bucket, localIoManager);
+    }
+
+    @Override
+    public boolean isLocalAccessor() {
+        return true;
+    }
+
+    @Override
+    public void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode,
+            IIOManager.FileSyncMode syncMode) throws HyracksDataException {
+        // NoOp
+    }
+
+    @Override
+    public Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+        return localIoManager.list(dir, filter);
+    }
+
+    @Override
+    public boolean doExists(FileReference fileRef) throws HyracksDataException {
+        return localIoManager.exists(fileRef);
+    }
+
+    @Override
+    public long doGetSize(FileReference fileReference) throws HyracksDataException {
+        return localIoManager.getSize(fileReference);
+    }
+
+    @Override
+    public byte[] doReadAllBytes(FileReference fileReference) throws HyracksDataException {
+        return localIoManager.readAllBytes(fileReference);
+    }
+
+    @Override
+    public void doDelete(FileReference fileReference) throws HyracksDataException {
+        // Never delete the storage dir in cloud storage
+        doCloudDelete(fileReference);
+        localIoManager.delete(fileReference);
+    }
+
+    @Override
+    public void doOverwrite(FileReference fileReference, byte[] bytes) throws HyracksDataException {
+        cloudClient.write(bucket, fileReference.getRelativePath(), bytes);
+        localIoManager.overwrite(fileReference, bytes);
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
new file mode 100644
index 0000000..e36ac66
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.io.FilenameFilter;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.WriteBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * ReplaceableCloudAccessor will be used when some (or all) of the files in the cloud storage are not cached locally.
+ * It will be replaced by {@link LocalAccessor} once everything is cached
+ */
+public class ReplaceableCloudAccessor extends AbstractLazyAccessor {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final Set<Integer> partitions;
+    private final AtomicInteger numberOfUncachedFiles;
+    private final WriteBufferProvider writeBufferProvider;
+    private final ILazyAccessorReplacer replacer;
+
+    public ReplaceableCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager,
+            Set<Integer> partitions, int numberOfUncachedFiles, WriteBufferProvider writeBufferProvider,
+            ILazyAccessorReplacer replacer) {
+        super(cloudClient, bucket, localIoManager);
+        this.partitions = partitions;
+        this.numberOfUncachedFiles = new AtomicInteger(numberOfUncachedFiles);
+        this.writeBufferProvider = writeBufferProvider;
+        this.replacer = replacer;
+    }
+
+    @Override
+    public boolean isLocalAccessor() {
+        return false;
+    }
+
+    @Override
+    public void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode,
+            IIOManager.FileSyncMode syncMode) throws HyracksDataException {
+        FileReference fileRef = fileHandle.getFileReference();
+        if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
+            // File doesn't exist locally, download it.
+            ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
+            try {
+                // TODO download for all partitions at once
+                LOGGER.info("Downloading {} from S3..", fileRef.getRelativePath());
+                CloudFileUtil.downloadFile(localIoManager, cloudClient, bucket, fileHandle, rwMode, syncMode,
+                        writeBuffer);
+                localIoManager.close(fileHandle);
+                LOGGER.info("Finished downloading {} from S3..", fileRef.getRelativePath());
+            } finally {
+                writeBufferProvider.recycle(writeBuffer);
+            }
+            // TODO decrement by the number of downloaded files in all partitions (once the above TODO is fixed)
+            decrementNumberOfUncachedFiles();
+        }
+    }
+
+    @Override
+    public Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+        Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
+        if (cloudFiles.isEmpty()) {
+            return Collections.emptySet();
+        }
+
+        // First get the set of local files
+        Set<FileReference> localFiles = localIoManager.list(dir, filter);
+
+        // Reconcile local files and cloud files
+        for (FileReference file : localFiles) {
+            String path = file.getRelativePath();
+            if (!cloudFiles.contains(path)) {
+                throw new IllegalStateException("Local file is not clean");
+            } else {
+                // No need to re-add it in the following loop
+                cloudFiles.remove(path);
+            }
+        }
+
+        // Add the remaining files that are not stored locally in their designated partitions (if any)
+        for (String cloudFile : cloudFiles) {
+            FileReference localFile = localIoManager.resolve(cloudFile);
+            if (isInNodePartition(cloudFile) && dir.getDeviceHandle().equals(localFile.getDeviceHandle())) {
+                localFiles.add(localFile);
+            }
+        }
+        return localFiles;
+    }
+
+    @Override
+    public boolean doExists(FileReference fileRef) throws HyracksDataException {
+        return localIoManager.exists(fileRef) || cloudClient.exists(bucket, fileRef.getRelativePath());
+    }
+
+    @Override
+    public long doGetSize(FileReference fileReference) throws HyracksDataException {
+        if (localIoManager.exists(fileReference)) {
+            return localIoManager.getSize(fileReference);
+        }
+        return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
+    }
+
+    @Override
+    public byte[] doReadAllBytes(FileReference fileRef) throws HyracksDataException {
+        if (!localIoManager.exists(fileRef) && isInNodePartition(fileRef.getRelativePath())) {
+            byte[] bytes = cloudClient.readAllBytes(bucket, fileRef.getRelativePath());
+            if (bytes != null && !partitions.isEmpty()) {
+                // Download the missing file for subsequent reads
+                localIoManager.overwrite(fileRef, bytes);
+                decrementNumberOfUncachedFiles();
+            }
+            return bytes;
+        }
+        return localIoManager.readAllBytes(fileRef);
+    }
+
+    @Override
+    public void doDelete(FileReference fileReference) throws HyracksDataException {
+        // Never delete the storage dir in cloud storage
+        int numberOfCloudDeletes = doCloudDelete(fileReference);
+        // check local
+        if (numberOfCloudDeletes > 0 && localIoManager.exists(fileReference)) {
+            int numberOfLocalDeletes = fileReference.getFile().isFile() ? 1 : localIoManager.list(fileReference).size();
+            // Decrement by number of cloud deletes that have no counterparts locally
+            decrementNumberOfUncachedFiles(numberOfCloudDeletes - numberOfLocalDeletes);
+        }
+
+        // Finally, delete locally
+        localIoManager.delete(fileReference);
+    }
+
+    @Override
+    public void doOverwrite(FileReference fileReference, byte[] bytes) throws HyracksDataException {
+        boolean existsLocally = localIoManager.exists(fileReference);
+        cloudClient.write(bucket, fileReference.getRelativePath(), bytes);
+        localIoManager.overwrite(fileReference, bytes);
+
+        if (!existsLocally) {
+            decrementNumberOfUncachedFiles();
+        }
+    }
+
+    protected void decrementNumberOfUncachedFiles() {
+        replaceAccessor(numberOfUncachedFiles.decrementAndGet());
+    }
+
+    protected void decrementNumberOfUncachedFiles(int count) {
+        if (count > 0) {
+            replaceAccessor(numberOfUncachedFiles.addAndGet(-count));
+        }
+    }
+
+    private boolean isInNodePartition(String path) {
+        return partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(path));
+    }
+
+    void replaceAccessor(int remainingUncached) {
+        if (remainingUncached > 0) {
+            // Some files still not cached yet
+            return;
+        }
+
+        if (remainingUncached < 0) {
+            // This should not happen, log in case that happen
+            LOGGER.warn("Some files were downloaded multiple times. Reported remaining uncached files = {}",
+                    remainingUncached);
+        }
+        replacer.replace();
+    }
+}