[ASTERIXDB-3217][CLUS] Support pause/resume for cloud deployment

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Upon booting, cluster's partitions will be restored
  according to the configured caching policy (either
  lazily or eagerly).
- Assume cluster state to be HEALTHY when txn
  checkping is missing in cloud deployment.
- This patch also include some minor fixes

Change-Id: I271261c7ba2a4164babb7d4f79aff75bd5401595
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17586
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 0611545..56f794b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -30,7 +30,8 @@
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.cloud.CloudIOManager;
+import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.LocalPartitionBootstrapper;
 import org.apache.asterix.common.api.IConfigValidator;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
 import org.apache.asterix.common.api.ICoordinationService;
@@ -40,6 +41,7 @@
 import org.apache.asterix.common.api.IPropertiesFactory;
 import org.apache.asterix.common.api.IReceptionist;
 import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CloudProperties;
@@ -95,7 +97,6 @@
 import org.apache.hyracks.client.result.ResultSet;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
@@ -166,6 +167,7 @@
     private IConfigValidator configValidator;
     private IDiskWriteRateLimiterProvider diskWriteRateLimiterProvider;
     private final CloudProperties cloudProperties;
+    private IPartitionBootstrapper partitionBootstrapper;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, NCExtensionManager extensionManager,
             IPropertiesFactory propertiesFactory) {
@@ -194,9 +196,11 @@
             boolean initialRun) throws IOException {
         ioManager = getServiceContext().getIoManager();
         if (isCloudDeployment()) {
-            persistenceIOManager = new CloudIOManager((IOManager) ioManager, cloudProperties);
+            persistenceIOManager = CloudManagerProvider.createIOManager(cloudProperties, ioManager);
+            partitionBootstrapper = CloudManagerProvider.getCloudPartitionBootstrapper(persistenceIOManager);
         } else {
             persistenceIOManager = ioManager;
+            partitionBootstrapper = new LocalPartitionBootstrapper(ioManager);
         }
         int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
         threadExecutor =
@@ -669,4 +673,9 @@
     public CloudProperties getCloudProperties() {
         return cloudProperties;
     }
+
+    @Override
+    public IPartitionBootstrapper getPartitionBootstrapper() {
+        return partitionBootstrapper;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 2ca3fbc..f6eb123 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -135,10 +135,7 @@
         //read checkpoint file
         Checkpoint checkpointObject = checkpointManager.getLatest();
         if (checkpointObject == null) {
-            //The checkpoint file doesn't exist => Failure happened during NC initialization.
-            //Retry to initialize the NC by setting the state to PERMANENT_DATA_LOSS
-            state = SystemState.PERMANENT_DATA_LOSS;
-            LOGGER.info("The checkpoint file doesn't exist: systemState = PERMANENT_DATA_LOSS");
+            state = appCtx.getPartitionBootstrapper().getSystemStateOnMissingCheckpoint();
             return state;
         }
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
index d214088..a44a695 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
@@ -21,9 +21,10 @@
 import java.util.Arrays;
 import java.util.Set;
 
-import org.apache.asterix.cloud.CloudIOManager;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
@@ -35,28 +36,36 @@
     private static final Logger LOGGER = LogManager.getLogger();
 
     private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
+    private final Set<Integer> storagePartitions;
+    private final boolean metadataNode;
+    private final int metadataPartitionId;
+    private final boolean cleanup;
 
-    public CloudToLocalStorageCachingTask(Set<Integer> partitions) {
-        this.partitions = partitions;
+    public CloudToLocalStorageCachingTask(Set<Integer> storagePartitions, boolean metadataNode, int metadataPartitionId,
+            boolean cleanup) {
+        this.storagePartitions = storagePartitions;
+        this.metadataNode = metadataNode;
+        this.metadataPartitionId = metadataPartitionId;
+        this.cleanup = cleanup;
     }
 
     @Override
     public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         INcApplicationContext applicationContext = (INcApplicationContext) cs.getApplicationContext();
+        PersistentLocalResourceRepository lrs =
+                (PersistentLocalResourceRepository) applicationContext.getLocalResourceRepository();
 
         String nodeId = applicationContext.getServiceContext().getNodeId();
-        LOGGER.info("Syncing cloud to local storage for node {}. for partitions: {}", nodeId, partitions);
+        LOGGER.info("Initializing Node {} with storage partitions: {}", nodeId, storagePartitions);
 
-        CloudIOManager cloudIOManager = (CloudIOManager) applicationContext.getPersistenceIoManager();
-
-        // TODO(htowaileb): eager caching is disabled for now as it depends on static partitioning work
-        cloudIOManager.syncFiles(partitions);
+        IPartitionBootstrapper bootstrapper = applicationContext.getPartitionBootstrapper();
+        bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), metadataNode, metadataPartitionId,
+                cleanup);
     }
 
     @Override
     public String toString() {
         return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"partitions\" : "
-                + Arrays.toString(partitions.toArray()) + " }";
+                + Arrays.toString(storagePartitions.toArray()) + " }";
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 449dd27..11a85ba 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -35,6 +35,7 @@
 
 import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
+import org.apache.asterix.app.nc.task.CloudToLocalStorageCachingTask;
 import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
 import org.apache.asterix.app.nc.task.LocalRecoveryTask;
 import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
@@ -48,9 +49,11 @@
 import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
 import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
 import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
@@ -83,7 +86,7 @@
     private final boolean replicationEnabled;
     private final IGatekeeper gatekeeper;
     Map<String, Map<String, Object>> nodeSecretsMap;
-    private ICCServiceContext serviceContext;
+    private final ICCServiceContext serviceContext;
 
     public NcLifecycleCoordinator(ICCServiceContext serviceCtx, boolean replicationEnabled) {
         this.serviceContext = serviceCtx;
@@ -219,6 +222,8 @@
         Set<Integer> nodeActivePartitions = getNodeActivePartitions(newNodeId, activePartitions, metadataNode);
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions));
         int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
+        // Add any cloud-related tasks
+        addCloudTasks(tasks, nodeActivePartitions, metadataNode, metadataPartitionId);
         tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
         if (state == SystemState.CORRUPTED) {
             // need to perform local recovery for node active partitions
@@ -251,6 +256,19 @@
         return tasks;
     }
 
+    protected void addCloudTasks(List<INCLifecycleTask> tasks, Set<Integer> computePartitions, boolean metadataNode,
+            int metadataPartitionId) {
+        IApplicationContext appCtx = (IApplicationContext) serviceContext.getApplicationContext();
+        if (!appCtx.isCloudDeployment()) {
+            return;
+        }
+
+        StorageComputePartitionsMap map = clusterManager.getStorageComputeMap();
+        map = map == null ? StorageComputePartitionsMap.computePartitionsMap(clusterManager) : map;
+        Set<Integer> storagePartitions = map.getStoragePartitions(computePartitions);
+        tasks.add(new CloudToLocalStorageCachingTask(storagePartitions, metadataNode, metadataPartitionId, false));
+    }
+
     private synchronized void process(MetadataNodeResponseMessage response) throws HyracksDataException {
         // rebind metadata node since it might be changing
         MetadataManager.INSTANCE.rebindMetadataNode();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
index eb2b05b..47c0599 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
@@ -27,10 +27,11 @@
     public static final String CONFIG_FILE = joinPath(RESOURCES_PATH, "cc-cloud-storage.conf");
 
     public static void main(String[] args) throws Exception {
-        CloudUtils.startS3CloudEnvironment();
+        boolean cleanStart = Boolean.getBoolean("cleanup.start");
+        LocalCloudUtil.startS3CloudEnvironment(cleanStart);
         final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
         try {
-            integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"),
+            integrationUtil.run(cleanStart, Boolean.getBoolean("cleanup.shutdown"),
                     System.getProperty("external.lib", ""), CONFIG_FILE);
         } catch (Exception e) {
             LOGGER.fatal("Unexpected exception", e);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtil.java
similarity index 81%
rename from asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudUtils.java
rename to asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtil.java
index 352dc06..39af667 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtil.java
@@ -18,8 +18,12 @@
  */
 package org.apache.asterix.api.common;
 
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
+import java.io.File;
 import java.net.URI;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -30,7 +34,7 @@
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
 
-public class CloudUtils {
+public class LocalCloudUtil {
 
     private static final Logger LOGGER = LogManager.getLogger();
 
@@ -38,20 +42,26 @@
     public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
     public static final String CLOUD_STORAGE_BUCKET = "cloud-storage-container";
     public static final String MOCK_SERVER_REGION = "us-west-2";
+    private static final String MOCK_FILE_BACKEND = joinPath("target", "s3mock");
     private static S3Mock s3MockServer;
 
-    private CloudUtils() {
+    private LocalCloudUtil() {
         throw new AssertionError("Do not instantiate");
     }
 
     public static void main(String[] args) {
-        startS3CloudEnvironment();
+        // Change to 'true' if you want to delete "s3mock" folder on start
+        startS3CloudEnvironment(true);
     }
 
-    public static void startS3CloudEnvironment() {
+    public static void startS3CloudEnvironment(boolean cleanStart) {
+        if (cleanStart) {
+            FileUtils.deleteQuietly(new File(MOCK_FILE_BACKEND));
+        }
         // Starting S3 mock server to be used instead of real S3 server
         LOGGER.info("Starting S3 mock server");
-        s3MockServer = new S3Mock.Builder().withPort(MOCK_SERVER_PORT).withInMemoryBackend().build();
+        // Use file backend for debugging/inspection
+        s3MockServer = new S3Mock.Builder().withPort(MOCK_SERVER_PORT).withFileBackend(MOCK_FILE_BACKEND).build();
         shutdownSilently();
         try {
             s3MockServer.start();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
index f3fd805..fd789e8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
@@ -20,7 +20,7 @@
 
 import java.util.Collection;
 
-import org.apache.asterix.api.common.CloudUtils;
+import org.apache.asterix.api.common.LocalCloudUtil;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.test.runtime.LangExecutionUtil;
@@ -53,7 +53,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        CloudUtils.startS3CloudEnvironment();
+        LocalCloudUtil.startS3CloudEnvironment(true);
         LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
         System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME);
     }
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
index 6ffa3c0..7d0b06b 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
@@ -62,8 +62,10 @@
 messaging.frame.count=512
 cloud.deployment=true
 storage.buffercache.pagesize=32KB
+storage.partitioning=static
 cloud.storage.scheme=s3
 cloud.storage.bucket=cloud-storage-container
 cloud.storage.region=us-west-2
 cloud.storage.endpoint=http://127.0.0.1:8001
-cloud.storage.anonymous.auth=true
\ No newline at end of file
+cloud.storage.anonymous.auth=true
+cloud.storage.cache.policy=lazy
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 74a17e0..a6ab2e0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -12,6 +12,7 @@
     "cloud.deployment" : false,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
+    "cloud.storage.cache.policy" : "lazy",
     "cloud.storage.endpoint" : "",
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index a61c0dd..395c96c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -12,6 +12,7 @@
     "cloud.deployment" : false,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
+    "cloud.storage.cache.policy" : "lazy",
     "cloud.storage.endpoint" : "",
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index fa3f5fa..8396311 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -12,6 +12,7 @@
     "cloud.deployment" : false,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
+    "cloud.storage.cache.policy" : "lazy",
     "cloud.storage.endpoint" : "",
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
new file mode 100644
index 0000000..1c06919
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import static org.apache.asterix.common.utils.StorageConstants.METADATA_PARTITION;
+import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.CloudClientProvider;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class AbstractCloudIOManager extends IOManager implements IPartitionBootstrapper {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final String DATAVERSE_PATH =
+            FileUtil.joinPath(STORAGE_ROOT_DIR_NAME, PARTITION_DIR_PREFIX + METADATA_PARTITION, "Metadata");
+    protected final ICloudClient cloudClient;
+    protected final WriteBufferProvider writeBufferProvider;
+    protected final String bucket;
+    protected final Set<Integer> partitions;
+    protected final List<FileReference> partitionPaths;
+    protected final IOManager localIoManager;
+
+    public AbstractCloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
+        super(ioManager.getIODevices(), ioManager.getDeviceComputer(), ioManager.getIOParallelism(),
+                ioManager.getQueueSize());
+        this.bucket = cloudProperties.getStorageBucket();
+        cloudClient = CloudClientProvider.getClient(cloudProperties);
+        int numOfThreads = getIODevices().size() * getIOParallelism();
+        writeBufferProvider = new WriteBufferProvider(numOfThreads);
+        partitions = new HashSet<>();
+        partitionPaths = new ArrayList<>();
+        this.localIoManager = ioManager;
+    }
+
+    /*
+     * ******************************************************************
+     * IPartitionBootstrapper functions
+     * ******************************************************************
+     */
+
+    @Override
+    public IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() {
+        if (cloudClient.listObjects(bucket, DATAVERSE_PATH, IoUtil.NO_OP_FILTER).isEmpty()) {
+            LOGGER.info("First time to initialize this cluster: systemState = PERMANENT_DATA_LOSS");
+            return IRecoveryManager.SystemState.PERMANENT_DATA_LOSS;
+        } else {
+            LOGGER.info("Resuming a previous initialized cluster: systemState = HEALTHY");
+            return IRecoveryManager.SystemState.HEALTHY;
+        }
+    }
+
+    @Override
+    public final void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions,
+            boolean metadataNode, int metadataPartition, boolean cleanup) throws HyracksDataException {
+        partitions.clear();
+        partitions.addAll(activePartitions);
+        if (metadataNode) {
+            partitions.add(metadataPartition);
+        }
+
+        partitionPaths.clear();
+        for (Integer partition : activePartitions) {
+            String partitionDir = PARTITION_DIR_PREFIX + partition;
+            partitionPaths.add(resolve(STORAGE_ROOT_DIR_NAME + File.separator + partitionDir));
+        }
+
+        LOGGER.warn("Initializing cloud manager with storage partitions: {}", partitions);
+
+        if (cleanup) {
+            deleteUnkeptPartitionDirs(currentOnDiskPartitions);
+            cleanupLocalFiles();
+        }
+        // Has different implementations depending on the caching policy
+        downloadPartitions();
+    }
+
+    private void deleteUnkeptPartitionDirs(List<FileReference> currentOnDiskPartitions) throws HyracksDataException {
+        for (FileReference partitionDir : currentOnDiskPartitions) {
+            int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(partitionDir.getRelativePath());
+            if (!partitions.contains(partitionNum)) {
+                LOGGER.warn("Deleting storage partition {} as it does not belong to the current storage partitions {}",
+                        partitionNum, partitions);
+                localIoManager.deleteDirectory(partitionDir);
+            }
+        }
+    }
+
+    private void cleanupLocalFiles() throws HyracksDataException {
+        Set<String> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER);
+        if (cloudFiles.isEmpty()) {
+            for (FileReference partitionPath : partitionPaths) {
+                if (localIoManager.exists(partitionPath)) {
+                    // Clean local dir from all files
+                    localIoManager.cleanDirectory(partitionPath);
+                }
+            }
+        } else {
+            for (FileReference partitionPath : partitionPaths) {
+                CloudFileUtil.cleanDirectoryFiles(localIoManager, cloudFiles, partitionPath);
+            }
+        }
+    }
+
+    protected abstract void downloadPartitions() throws HyracksDataException;
+
+    /*
+     * ******************************************************************
+     * IIOManager functions
+     * ******************************************************************
+     */
+
+    @Override
+    public boolean exists(FileReference fileRef) throws HyracksDataException {
+        return localIoManager.exists(fileRef) || cloudClient.exists(bucket, fileRef.getRelativePath());
+    }
+
+    @Override
+    public final IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+            throws HyracksDataException {
+        CloudFileHandle fHandle = new CloudFileHandle(cloudClient, bucket, fileRef, writeBufferProvider);
+        onOpen(fHandle, rwMode, syncMode);
+        try {
+            fHandle.open(rwMode, syncMode);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        return fHandle;
+    }
+
+    /**
+     * Action required to do when opening a file
+     *
+     * @param fileHandle file to open
+     */
+    protected abstract void onOpen(CloudFileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode)
+            throws HyracksDataException;
+
+    @Override
+    public final long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray)
+            throws HyracksDataException {
+        long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray);
+        CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream();
+        try {
+            inputStream.write(dataArray[0], dataArray[1]);
+        } catch (HyracksDataException e) {
+            inputStream.abort();
+            throw e;
+        }
+        return writtenBytes;
+    }
+
+    @Override
+    public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer dataArray) throws HyracksDataException {
+        int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray);
+        CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream();
+        try {
+            inputStream.write(dataArray);
+        } catch (HyracksDataException e) {
+            inputStream.abort();
+            throw e;
+        }
+        return writtenBytes;
+    }
+
+    @Override
+    public final void delete(FileReference fileRef) throws HyracksDataException {
+        if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
+            // Never delete the storage dir in cloud storage
+            cloudClient.deleteObject(bucket, fileRef.getRelativePath());
+        }
+        localIoManager.delete(fileRef);
+    }
+
+    @Override
+    public final void close(IFileHandle fHandle) throws HyracksDataException {
+        try {
+            CloudFileHandle cloudFileHandle = (CloudFileHandle) fHandle;
+            cloudFileHandle.close();
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public final void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
+        HyracksDataException savedEx = null;
+        if (metadata) {
+            // only finish writing if metadata == true to prevent write limiter from finishing the stream and
+            // completing the upload.
+            CloudResettableInputStream stream = ((CloudFileHandle) fileHandle).getInputStream();
+            try {
+                stream.finish();
+            } catch (HyracksDataException e) {
+                savedEx = e;
+            }
+
+            if (savedEx != null) {
+                try {
+                    stream.abort();
+                } catch (HyracksDataException e) {
+                    savedEx.addSuppressed(e);
+                }
+                throw savedEx;
+            }
+        }
+        // Sync only after finalizing the upload to cloud storage
+        localIoManager.sync(fileHandle, metadata);
+    }
+
+    @Override
+    public final void overwrite(FileReference fileRef, byte[] bytes) throws HyracksDataException {
+        // Write here will overwrite the older object if exists
+        cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
+        localIoManager.overwrite(fileRef, bytes);
+    }
+
+    @Override
+    public final void deleteDirectory(FileReference fileRef) throws HyracksDataException {
+        if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
+            // Never delete the storage dir in cloud storage
+            cloudClient.deleteObject(bucket, fileRef.getRelativePath());
+        }
+        localIoManager.deleteDirectory(fileRef);
+    }
+
+    @Override
+    public final void create(FileReference fileRef) throws HyracksDataException {
+        // We need to delete the local file on create as the cloud storage didn't complete the upload
+        // In other words, both cloud files and the local files are not in sync
+        localIoManager.delete(fileRef);
+        localIoManager.create(fileRef);
+    }
+
+    @Override
+    public final void copyDirectory(FileReference srcFileRef, FileReference destFileRef) throws HyracksDataException {
+        cloudClient.copy(bucket, srcFileRef.getRelativePath(), destFileRef);
+        localIoManager.copyDirectory(srcFileRef, destFileRef);
+    }
+
+    // TODO(htowaileb): the localIoManager is closed by the node controller service as well, check if we need this
+    @Override
+    public final void close() throws IOException {
+        cloudClient.close();
+        super.close();
+        localIoManager.close();
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
index 24cb0bb..8572014 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
@@ -43,6 +43,12 @@
         }
     }
 
+    @Override
+    public synchronized void close() throws IOException {
+        inputStream.close();
+        super.close();
+    }
+
     public CloudResettableInputStream getInputStream() {
         return inputStream;
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
deleted file mode 100644
index 4939ab5..0000000
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.cloud;
-
-import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
-import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.asterix.cloud.clients.CloudClientProvider;
-import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.asterix.cloud.clients.aws.s3.S3Utils;
-import org.apache.asterix.common.config.CloudProperties;
-import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileDeviceResolver;
-import org.apache.hyracks.api.io.IFileHandle;
-import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.api.util.IoUtil;
-import org.apache.hyracks.control.nc.io.FileHandle;
-import org.apache.hyracks.control.nc.io.IOManager;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class CloudIOManager extends IOManager {
-    private static final Logger LOGGER = LogManager.getLogger();
-    private final ICloudClient cloudClient;
-    private final WriteBufferProvider writeBufferProvider;
-    private final String bucket;
-    private IOManager localIoManager;
-
-    private CloudIOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer, int ioParallelism,
-            int queueSize, CloudProperties cloudProperties) throws HyracksDataException {
-        super(devices, deviceComputer, ioParallelism, queueSize);
-        this.bucket = cloudProperties.getStorageBucket();
-        cloudClient = CloudClientProvider.getClient(cloudProperties);
-        int numOfThreads = getIODevices().size() * getIoParallelism();
-        writeBufferProvider = new WriteBufferProvider(numOfThreads);
-    }
-
-    public CloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
-        this(ioManager.getIoDevices(), ioManager.getDeviceComputer(), ioManager.getIoParallelism(),
-                ioManager.getQueueSize(), cloudProperties);
-        this.localIoManager = ioManager;
-    }
-
-    public String getBucket() {
-        return bucket;
-    }
-
-    @Override
-    public long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
-        long writtenBytes = super.doSyncWrite(fHandle, offset, dataArray);
-        CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream();
-        try {
-            inputStream.write(dataArray[0], dataArray[1]);
-        } catch (HyracksDataException e) {
-            inputStream.abort();
-            throw e;
-        }
-        return writtenBytes;
-    }
-
-    @Override
-    public int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer dataArray) throws HyracksDataException {
-        int writtenBytes = super.doSyncWrite(fHandle, offset, dataArray);
-        CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream();
-        try {
-            inputStream.write(dataArray);
-        } catch (HyracksDataException e) {
-            inputStream.abort();
-            throw e;
-        }
-        return writtenBytes;
-    }
-
-    @Override
-    public IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
-            throws HyracksDataException {
-
-        CloudFileHandle fHandle = new CloudFileHandle(cloudClient, bucket, fileRef, writeBufferProvider);
-        if (!super.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
-            ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
-            try {
-                LOGGER.info("Downloading {} from S3..", fileRef.getRelativePath());
-                downloadFile(fHandle, rwMode, syncMode, writeBuffer);
-                super.close(fHandle);
-                LOGGER.info("Finished downloading {} from S3..", fileRef.getRelativePath());
-            } finally {
-                writeBufferProvider.recycle(writeBuffer);
-            }
-        }
-
-        try {
-            fHandle.open(rwMode, syncMode);
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
-        }
-        return fHandle;
-    }
-
-    @Override
-    public void delete(FileReference fileRef) throws HyracksDataException {
-        if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
-            // Never delete the storage dir in cloud storage
-            cloudClient.deleteObject(bucket, fileRef.getRelativePath());
-        }
-        super.delete(fileRef);
-    }
-
-    @Override
-    public void close(IFileHandle fHandle) throws HyracksDataException {
-        try {
-            ((CloudFileHandle) fHandle).getInputStream().close();
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
-        }
-        super.close(fHandle);
-    }
-
-    // TODO This method should not do any syncing. It simply should list the files
-    @Override
-    public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
-        Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
-        if (cloudFiles.isEmpty()) {
-            // TODO(htowaileb): Can we end up in a state where local has files but cloud does not?
-            return Collections.emptySet();
-        }
-
-        // First get the set of local files
-        Set<FileReference> localFiles = super.list(dir, filter);
-        Iterator<FileReference> localFilesIter = localFiles.iterator();
-
-        // Reconcile local files and cloud files
-        while (localFilesIter.hasNext()) {
-            FileReference file = localFilesIter.next();
-            if (file.getFile().isDirectory()) {
-                continue;
-            }
-            String path = S3Utils.toCloudPrefix(file.getRelativePath());
-            if (!cloudFiles.contains(path)) {
-                // Delete local files that do not exist in cloud storage (the ground truth for valid files)
-                localFilesIter.remove();
-                super.delete(file);
-            } else {
-                // No need to re-add it in the following loop
-                cloudFiles.remove(path);
-            }
-        }
-
-        // Add the remaining files that are not stored locally (if any)
-        for (String cloudFile : cloudFiles) {
-            localFiles.add(new FileReference(dir.getDeviceHandle(),
-                    cloudFile.substring(cloudFile.indexOf(dir.getRelativePath()))));
-        }
-        return new HashSet<>(localFiles);
-    }
-
-    @Override
-    public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
-        HyracksDataException savedEx = null;
-        if (metadata) {
-            // only finish writing if metadata == true to prevent write limiter from finishing the stream and
-            // completing the upload.
-            CloudResettableInputStream stream = ((CloudFileHandle) fileHandle).getInputStream();
-            try {
-                stream.finish();
-            } catch (HyracksDataException e) {
-                savedEx = e;
-            }
-
-            if (savedEx != null) {
-                try {
-                    stream.abort();
-                } catch (HyracksDataException e) {
-                    savedEx.addSuppressed(e);
-                }
-                throw savedEx;
-            }
-        }
-        // Sync only after finalizing the upload to cloud storage
-        super.sync(fileHandle, metadata);
-    }
-
-    @Override
-    public long getSize(IFileHandle fileHandle) throws HyracksDataException {
-        if (!fileHandle.getFileReference().getFile().exists()) {
-            return cloudClient.getObjectSize(bucket, fileHandle.getFileReference().getRelativePath());
-        }
-        return super.getSize(fileHandle);
-    }
-
-    @Override
-    public long getSize(FileReference fileReference) throws HyracksDataException {
-        if (!fileReference.getFile().exists()) {
-            return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
-        }
-        return super.getSize(fileReference);
-    }
-
-    @Override
-    public void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException {
-        super.overwrite(fileRef, bytes);
-        // Write here will overwrite the older object if exists
-        cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
-    }
-
-    @Override
-    public int doSyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
-        return super.doSyncRead(fHandle, offset, data);
-    }
-
-    @Override
-    public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
-        if (!fileRef.getFile().exists()) {
-            IFileHandle open = open(fileRef, FileReadWriteMode.READ_WRITE, FileSyncMode.METADATA_SYNC_DATA_SYNC);
-            fileRef = open.getFileReference();
-        }
-        return super.readAllBytes(fileRef);
-    }
-
-    @Override
-    public void deleteDirectory(FileReference fileRef) throws HyracksDataException {
-        // TODO(htowaileb): Should we delete the cloud first?
-        super.deleteDirectory(fileRef);
-        if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
-            // Never delete the storage dir in cloud storage
-            cloudClient.deleteObject(bucket, fileRef.getRelativePath());
-        }
-    }
-
-    @Override
-    public boolean exists(FileReference fileRef) throws HyracksDataException {
-        // Check if the file exists locally first as newly created files (i.e., they are empty) are not stored in cloud storage
-        return fileRef.getFile().exists() || cloudClient.exists(bucket, fileRef.getRelativePath());
-    }
-
-    @Override
-    public void create(FileReference fileRef) throws HyracksDataException {
-        // We need to delete the local file on create as the cloud storage didn't complete the upload
-        // In other words, both cloud files and the local files are not in sync
-        super.delete(fileRef);
-        super.create(fileRef);
-    }
-
-    @Override
-    public void copyDirectory(FileReference srcFileRef, FileReference destFileRef) throws HyracksDataException {
-        cloudClient.copy(bucket, srcFileRef.getRelativePath(), destFileRef);
-        super.copyDirectory(srcFileRef, destFileRef);
-    }
-
-    protected long writeLocally(IFileHandle fHandle, long offset, ByteBuffer buffer) throws HyracksDataException {
-        return super.doSyncWrite(fHandle, offset, buffer);
-    }
-
-    protected void syncLocally(IFileHandle fileHandle) throws HyracksDataException {
-        super.sync(fileHandle, true);
-    }
-
-    @Override
-    public void syncFiles(Set<Integer> activePartitions) throws HyracksDataException {
-        Map<String, String> cloudToLocalStoragePaths = new HashMap<>();
-        for (Integer partition : activePartitions) {
-            String partitionToFind = PARTITION_DIR_PREFIX + partition + "/";
-            IODeviceHandle deviceHandle = getDeviceComputer().resolve(partitionToFind, getIODevices());
-
-            String cloudStoragePath = STORAGE_ROOT_DIR_NAME + "/" + partitionToFind;
-            String localStoragePath = deviceHandle.getMount().getAbsolutePath() + "/" + cloudStoragePath;
-            cloudToLocalStoragePaths.put(cloudStoragePath, localStoragePath);
-        }
-        LOGGER.info("Resolved paths to io devices: {}", cloudToLocalStoragePaths);
-        cloudClient.syncFiles(bucket, cloudToLocalStoragePaths);
-    }
-
-    // TODO(htowaileb): the localIoManager is closed by the node controller service as well, check if we need this
-    @Override
-    public void close() throws IOException {
-        cloudClient.close();
-        super.close();
-        localIoManager.close();
-    }
-
-    private void downloadFile(FileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode,
-            ByteBuffer writeBuffer) throws HyracksDataException {
-        FileReference fileRef = fileHandle.getFileReference();
-        File file = fileRef.getFile();
-
-        try (InputStream inputStream = cloudClient.getObjectStream(bucket, fileRef.getRelativePath())) {
-            FileUtils.createParentDirectories(file);
-            if (!file.createNewFile()) {
-                throw new IllegalStateException("Couldn't create local file");
-            }
-
-            fileHandle.open(rwMode, syncMode);
-            writeToFile(fileHandle, inputStream, writeBuffer);
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    private void writeToFile(IFileHandle fileHandle, InputStream inStream, ByteBuffer writeBuffer)
-            throws HyracksDataException {
-        writeBuffer.clear();
-        try {
-            int position = 0;
-            long offset = 0;
-            int read;
-            while ((read = inStream.read(writeBuffer.array(), position, writeBuffer.remaining())) >= 0) {
-                position += read;
-                writeBuffer.position(position);
-                if (writeBuffer.remaining() == 0) {
-                    offset += writeBufferToFile(fileHandle, writeBuffer, offset);
-                    position = 0;
-                }
-            }
-
-            if (writeBuffer.position() > 0) {
-                writeBufferToFile(fileHandle, writeBuffer, offset);
-                syncLocally(fileHandle);
-            }
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    private long writeBufferToFile(IFileHandle fileHandle, ByteBuffer writeBuffer, long offset)
-            throws HyracksDataException {
-        writeBuffer.flip();
-        long written = writeLocally(fileHandle, offset, writeBuffer);
-        writeBuffer.clear();
-        return written;
-    }
-}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
new file mode 100644
index 0000000..6ba31db
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import org.apache.asterix.common.cloud.CloudCachePolicy;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+public class CloudManagerProvider {
+    private CloudManagerProvider() {
+    }
+
+    public static IIOManager createIOManager(CloudProperties cloudProperties, IIOManager ioManager)
+            throws HyracksDataException {
+        IOManager localIoManager = (IOManager) ioManager;
+        if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) {
+            return new LazyCloudIOManager(localIoManager, cloudProperties);
+        }
+
+        return new EagerCloudIOManager(localIoManager, cloudProperties);
+    }
+
+    public static IPartitionBootstrapper getCloudPartitionBootstrapper(IIOManager ioManager) {
+        if (!(ioManager instanceof AbstractCloudIOManager)) {
+            throw new IllegalArgumentException("Not a cloud IOManager");
+        }
+        return (IPartitionBootstrapper) ioManager;
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
new file mode 100644
index 0000000..0c61957
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * CloudIOManager with eager caching
+ * {@link org.apache.hyracks.api.io.IIOManager} methods are implemented either in
+ * - {@link IOManager}
+ * OR
+ * - {@link AbstractCloudIOManager}
+ */
+class EagerCloudIOManager extends AbstractCloudIOManager {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    public EagerCloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
+        super(ioManager, cloudProperties);
+    }
+
+    /*
+     * ******************************************************************
+     * AbstractCloudIOManager functions
+     * ******************************************************************
+     */
+
+    @Override
+    protected void downloadPartitions() throws HyracksDataException {
+        // TODO currently it throws an error in local test
+        Map<String, String> cloudToLocalStoragePaths = new HashMap<>();
+        for (FileReference partitionPath : partitionPaths) {
+            String cloudStoragePath = STORAGE_ROOT_DIR_NAME + "/" + partitionPath.getName();
+            cloudToLocalStoragePaths.put(cloudStoragePath, partitionPath.getAbsolutePath());
+        }
+        LOGGER.info("Resolved paths to io devices: {}", cloudToLocalStoragePaths);
+        cloudClient.syncFiles(bucket, cloudToLocalStoragePaths);
+    }
+
+    @Override
+    protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode) {
+        // NoOp
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
new file mode 100644
index 0000000..2232dcd
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * CloudIOManager with lazy caching
+ * - Overrides some of {@link IOManager} functions
+ */
+class LazyCloudIOManager extends AbstractCloudIOManager {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
+        super(ioManager, cloudProperties);
+    }
+
+    /*
+     * ******************************************************************
+     * AbstractCloudIOManager functions
+     * ******************************************************************
+     */
+
+    @Override
+    protected void downloadPartitions() {
+        // NoOp
+    }
+
+    @Override
+    protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode)
+            throws HyracksDataException {
+        FileReference fileRef = fileHandle.getFileReference();
+        if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
+            // File doesn't exist locally, download it.
+            ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
+            try {
+                // TODO download for all partitions at once
+                LOGGER.info("Downloading {} from S3..", fileRef.getRelativePath());
+                CloudFileUtil.downloadFile(localIoManager, cloudClient, bucket, fileHandle, rwMode, syncMode,
+                        writeBuffer);
+                localIoManager.close(fileHandle);
+                LOGGER.info("Finished downloading {} from S3..", fileRef.getRelativePath());
+            } finally {
+                writeBufferProvider.recycle(writeBuffer);
+            }
+        }
+    }
+
+    /*
+     * ******************************************************************
+     * IIOManager functions
+     * ******************************************************************
+     */
+    @Override
+    public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+        Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
+        if (cloudFiles.isEmpty()) {
+            return Collections.emptySet();
+        }
+
+        // First get the set of local files
+        Set<FileReference> localFiles = localIoManager.list(dir, filter);
+
+        // Reconcile local files and cloud files
+        for (FileReference file : localFiles) {
+            String path = file.getRelativePath();
+            if (!cloudFiles.contains(path)) {
+                throw new IllegalStateException("Local file is not clean");
+            } else {
+                // No need to re-add it in the following loop
+                cloudFiles.remove(path);
+            }
+        }
+
+        // Add the remaining files that are not stored locally in their designated partitions (if any)
+        for (String cloudFile : cloudFiles) {
+            FileReference localFile = resolve(cloudFile);
+            if (isInNodePartition(cloudFile) && dir.getDeviceHandle().equals(localFile.getDeviceHandle())) {
+                localFiles.add(localFile);
+            }
+        }
+        return new HashSet<>(localFiles);
+    }
+
+    @Override
+    public long getSize(FileReference fileReference) throws HyracksDataException {
+        if (localIoManager.exists(fileReference)) {
+            return localIoManager.getSize(fileReference);
+        }
+        return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
+    }
+
+    @Override
+    public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
+        if (!localIoManager.exists(fileRef) && isInNodePartition(fileRef.getRelativePath())) {
+            byte[] bytes = cloudClient.readAllBytes(bucket, fileRef.getRelativePath());
+            if (bytes != null && !partitions.isEmpty()) {
+                localIoManager.overwrite(fileRef, bytes);
+            }
+            return bytes;
+        }
+        return localIoManager.readAllBytes(fileRef);
+    }
+
+    private boolean isInNodePartition(String path) {
+        int start = path.indexOf(PARTITION_DIR_PREFIX) + PARTITION_DIR_PREFIX.length();
+        int length = path.indexOf(File.separatorChar, start);
+        return partitions.contains(Integer.parseInt(path.substring(start, length)));
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
new file mode 100644
index 0000000..db7b6d6
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.cloud;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class LocalPartitionBootstrapper implements IPartitionBootstrapper {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final IIOManager ioManager;
+
+    public LocalPartitionBootstrapper(IIOManager ioManager) {
+        this.ioManager = ioManager;
+    }
+
+    @Override
+    public IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() {
+        //The checkpoint file doesn't exist => Failure happened during NC initialization.
+        //Retry to initialize the NC by setting the state to PERMANENT_DATA_LOSS
+        LOGGER.info("The checkpoint file doesn't exist: systemState = PERMANENT_DATA_LOSS");
+        return IRecoveryManager.SystemState.PERMANENT_DATA_LOSS;
+    }
+
+    @Override
+    public void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions,
+            boolean metadataNode, int metadataPartition, boolean cleanup) throws HyracksDataException {
+        for (FileReference onDiskPartition : currentOnDiskPartitions) {
+            int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
+            if (!activePartitions.contains(partitionNum)) {
+                LOGGER.warn("deleting partition {} since it is not on partitions to keep {}", partitionNum,
+                        activePartitions);
+                ioManager.delete(onDiskPartition);
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
new file mode 100644
index 0000000..79ddba1
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.FileHandle;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+public class CloudFileUtil {
+    private CloudFileUtil() {
+    }
+
+    public static void downloadFile(IOManager ioManager, ICloudClient cloudClient, String bucket, FileHandle fileHandle,
+            IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode, ByteBuffer writeBuffer)
+            throws HyracksDataException {
+        FileReference fileRef = fileHandle.getFileReference();
+        File file = fileRef.getFile();
+
+        try (InputStream inputStream = cloudClient.getObjectStream(bucket, fileRef.getRelativePath())) {
+            FileUtils.createParentDirectories(file);
+            if (!file.createNewFile()) {
+                throw new IllegalStateException("Couldn't create local file");
+            }
+
+            fileHandle.open(rwMode, syncMode);
+            writeToFile(ioManager, fileHandle, inputStream, writeBuffer);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static void cleanDirectoryFiles(IOManager ioManager, Set<String> cloudFiles, FileReference partitionPath)
+            throws HyracksDataException {
+        // First get the set of local files
+        Set<FileReference> localFiles = ioManager.list(partitionPath);
+        Iterator<FileReference> localFilesIter = localFiles.iterator();
+
+        // Reconcile local files and cloud files
+        while (localFilesIter.hasNext()) {
+            FileReference file = localFilesIter.next();
+            if (file.getFile().isDirectory()) {
+                continue;
+            }
+
+            String path = file.getRelativePath();
+            if (!cloudFiles.contains(path)) {
+                // Delete local files that do not exist in cloud storage (the ground truth for valid files)
+                localFilesIter.remove();
+                ioManager.delete(file);
+            } else {
+                // No need to re-add it in the following loop
+                cloudFiles.remove(path);
+            }
+        }
+
+        // Add the remaining files that are not stored locally (if any)
+        for (String cloudFile : cloudFiles) {
+            if (!cloudFile.contains(partitionPath.getRelativePath())) {
+                continue;
+            }
+            localFiles.add(new FileReference(partitionPath.getDeviceHandle(),
+                    cloudFile.substring(cloudFile.indexOf(partitionPath.getRelativePath()))));
+        }
+    }
+
+    private static void writeToFile(IOManager ioManager, IFileHandle fileHandle, InputStream inStream,
+            ByteBuffer writeBuffer) throws HyracksDataException {
+        writeBuffer.clear();
+        try {
+            int position = 0;
+            long offset = 0;
+            int read;
+            while ((read = inStream.read(writeBuffer.array(), position, writeBuffer.remaining())) >= 0) {
+                position += read;
+                writeBuffer.position(position);
+                if (writeBuffer.remaining() == 0) {
+                    offset += writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
+                    position = 0;
+                }
+            }
+
+            if (writeBuffer.position() > 0) {
+                writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
+                ioManager.sync(fileHandle, true);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private static long writeBufferToFile(IOManager ioManager, IFileHandle fileHandle, ByteBuffer writeBuffer,
+            long offset) throws HyracksDataException {
+        writeBuffer.flip();
+        long written = ioManager.doSyncWrite(fileHandle, offset, writeBuffer);
+        writeBuffer.clear();
+        return written;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 23487a9..8e6becf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -22,6 +22,7 @@
 import java.rmi.RemoteException;
 import java.util.concurrent.Executor;
 
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
@@ -51,6 +52,8 @@
 
     IIOManager getPersistenceIoManager();
 
+    IPartitionBootstrapper getPartitionBootstrapper();
+
     Executor getThreadExecutor();
 
     ITransactionSubsystem getTransactionSubsystem();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
new file mode 100644
index 0000000..c6858c5
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cloud;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum CloudCachePolicy {
+    EAGER("eager"),
+    LAZY("lazy");
+    private static final Map<String, CloudCachePolicy> partitioningSchemes =
+            Collections.unmodifiableMap(Arrays.stream(CloudCachePolicy.values())
+                    .collect(Collectors.toMap(CloudCachePolicy::getPolicyName, Function.identity())));
+
+    private final String policyName;
+
+    CloudCachePolicy(String policyName) {
+        this.policyName = policyName;
+    }
+
+    public String getPolicyName() {
+        return policyName;
+    }
+
+    public static CloudCachePolicy fromName(String policyName) {
+        CloudCachePolicy partitioningScheme = partitioningSchemes.get(policyName.toLowerCase());
+        if (partitioningScheme == null) {
+            throw new IllegalArgumentException("unknown cloud cache policy: " + policyName);
+        }
+        return partitioningScheme;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
new file mode 100644
index 0000000..fd9a1b3
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cloud;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+
+/**
+ * Bootstraps a node's partitions directories
+ */
+public interface IPartitionBootstrapper {
+
+    /**
+     * @return the systems state in case the checkpoint upon calling {@link IRecoveryManager#getSystemState()}
+     * is missing the checkpoint file
+     */
+    IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint();
+
+    /**
+     * Bootstraps the node's partitions directory by doing the following:
+     * - Deleting the directories in <code>currentOnDiskPartitions</code> that are not in <code>activePartitions</code>
+     * - In cloud deployment only, it does the following:
+     * This will do the following:
+     * - Cleanup all local files (i.e., delete all local files that do not exist in the cloud storage)
+     * - Depends on the caching policy:
+     * - - Eager: Will download the contents of all active partitions
+     * - - Lazy: No file will be downloaded at start, but will be when opened
+     *
+     * @param activePartitions        the current active storage partitions of the NC
+     * @param currentOnDiskPartitions paths to the current local partitions
+     * @param metadataNode            whether the node is a metadata node as well
+     * @param metadataPartition       metadata partition number
+     * @param cleanup                 performs cleanup by deleting all unkept partitions
+     */
+    void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions, boolean metadataNode,
+            int metadataPartition, boolean cleanup) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 5f714f4..343a23d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -160,8 +160,7 @@
     /**
      * Returns the IO devices configured for a Node Controller
      *
-     * @param nodeId
-     *            unique identifier of the Node Controller
+     * @param nodeId unique identifier of the Node Controller
      * @return a list of IO devices.
      */
     String[] getIODevices(String nodeId);
@@ -172,15 +171,13 @@
     AlgebricksAbsolutePartitionConstraint getClusterLocations();
 
     /**
-     * @param excludePendingRemoval
-     *            true, if the desired set shouldn't have pending removal nodes
+     * @param excludePendingRemoval true, if the desired set shouldn't have pending removal nodes
      * @return the set of participant nodes
      */
     Set<String> getParticipantNodes(boolean excludePendingRemoval);
 
     /**
-     * @param node
-     *            the node id
+     * @param node the node id
      * @return the number of partitions on that node
      */
     int getNodePartitionsCount(String node);
@@ -262,6 +259,7 @@
 
     /**
      * Indicate whether one or more datasets must be rebalanced before the cluster becomes ACTIVE
+     *
      * @param rebalanceRequired
      */
     void setRebalanceRequired(boolean rebalanceRequired) throws HyracksDataException;
@@ -283,13 +281,20 @@
 
     /**
      * Gets the count of storage partitions
+     *
      * @return the count of storage partitions
      */
     int getStoragePartitionsCount();
 
     /**
+     * @return the current compute-storage partitions map
+     */
+    StorageComputePartitionsMap getStorageComputeMap();
+
+    /**
      * Sets the compute-storage partitions map
-     * @param map
+     *
+     * @param map the new map
      */
     void setComputeStoragePartitionsMap(StorageComputePartitionsMap map);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
index 6561d05..a16204d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,7 +29,7 @@
 
 public class StorageComputePartitionsMap {
 
-    private final Map<Integer, ComputePartition> stoToComputeLocation = new HashMap<>();
+    private final Map<Integer, ComputePartition> storageComputeMap = new HashMap<>();
     private final int storagePartitionsCount;
 
     public StorageComputePartitionsMap(int storagePartitionsCount) {
@@ -36,7 +37,7 @@
     }
 
     public void addStoragePartition(int stoPart, ComputePartition compute) {
-        stoToComputeLocation.put(stoPart, compute);
+        storageComputeMap.put(stoPart, compute);
     }
 
     public int[][] getComputeToStorageMap(boolean metadataDataset) {
@@ -63,8 +64,34 @@
         return computerToStoArray;
     }
 
+    public int getStoragePartitionsCount() {
+        return storagePartitionsCount;
+    }
+
     public ComputePartition getComputePartition(int storagePartition) {
-        return stoToComputeLocation.get(storagePartition);
+        return storageComputeMap.get(storagePartition);
+    }
+
+    public Set<String> getComputeNodes() {
+        return storageComputeMap.values().stream().map(ComputePartition::getNodeId).collect(Collectors.toSet());
+    }
+
+    /**
+     * For a set of compute partitions, return a set of their corresponding storage partitions
+     *
+     * @param computePartitions the current active compute partitions
+     * @return computePartitions's corresponding storage partitions
+     */
+    public Set<Integer> getStoragePartitions(Set<Integer> computePartitions) {
+        Set<Integer> storagePartitions = new HashSet<>();
+        for (Map.Entry<Integer, ComputePartition> entry : storageComputeMap.entrySet()) {
+            ComputePartition computePartition = entry.getValue();
+            if (computePartitions.contains(computePartition.getId())) {
+                storagePartitions.add(entry.getKey());
+            }
+        }
+
+        return storagePartitions;
     }
 
     public static StorageComputePartitionsMap computePartitionsMap(IClusterStateManager clusterStateManager) {
@@ -97,8 +124,4 @@
         }
         return newMap;
     }
-
-    public Set<String> getComputeNodes() {
-        return stoToComputeLocation.values().stream().map(ComputePartition::getNodeId).collect(Collectors.toSet());
-    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index a480167..90c3acb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -21,6 +21,7 @@
 import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
 
+import org.apache.asterix.common.cloud.CloudCachePolicy;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.api.config.Section;
@@ -37,7 +38,8 @@
         CLOUD_STORAGE_PREFIX(STRING, ""),
         CLOUD_STORAGE_REGION(STRING, ""),
         CLOUD_STORAGE_ENDPOINT(STRING, ""),
-        CLOUD_STORAGE_ANONYMOUS_AUTH(BOOLEAN, false);
+        CLOUD_STORAGE_ANONYMOUS_AUTH(BOOLEAN, false),
+        CLOUD_STORAGE_CACHE_POLICY(STRING, "lazy");
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -56,6 +58,7 @@
                 case CLOUD_STORAGE_REGION:
                 case CLOUD_STORAGE_ENDPOINT:
                 case CLOUD_STORAGE_ANONYMOUS_AUTH:
+                case CLOUD_STORAGE_CACHE_POLICY:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -77,6 +80,10 @@
                     return "The cloud storage endpoint";
                 case CLOUD_STORAGE_ANONYMOUS_AUTH:
                     return "Indicates whether or not anonymous auth should be used for the cloud storage";
+                case CLOUD_STORAGE_CACHE_POLICY:
+                    return "The caching policy (either eager or lazy). 'Eager' caching will download all partitions"
+                            + " upon booting, whereas lazy caching will download a file upon request to open it."
+                            + " (default: 'lazy')";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -117,4 +124,8 @@
     public boolean isStorageAnonymousAuth() {
         return accessor.getBoolean(Option.CLOUD_STORAGE_ANONYMOUS_AUTH);
     }
+
+    public CloudCachePolicy getCloudCachePolicy() {
+        return CloudCachePolicy.fromName(accessor.getString(Option.CLOUD_STORAGE_CACHE_POLICY));
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 0128478..4b36b20 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -507,6 +507,7 @@
         return appCtx.getStorageProperties().getStoragePartitionsCount();
     }
 
+    @Override
     public synchronized StorageComputePartitionsMap getStorageComputeMap() {
         return storageComputePartitionsMap;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 10f1637..95dcb2f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -126,6 +126,7 @@
 
     /**
      * Lists the files matching {@code filter} recursively starting from {@code dir}
+     *
      * @param dir
      * @param filter
      * @return the matching files
@@ -149,6 +150,4 @@
     boolean makeDirectories(FileReference resourceDir);
 
     void cleanDirectory(FileReference resourceDir) throws HyracksDataException;
-
-    void syncFiles(Set<Integer> activePartitions) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java
index c826c2f..c160f67 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java
@@ -41,10 +41,8 @@
     private final String workspace;
 
     /**
-     * @param mount
-     *            The device root
-     * @param workspace
-     *            The relative workspace inside the device
+     * @param mount     The device root
+     * @param workspace The relative workspace inside the device
      */
     public IODeviceHandle(File mount, String workspace) {
         this.mount = mount;
@@ -63,8 +61,7 @@
     /**
      * Create a file reference
      *
-     * @param relPath
-     *            the relative path
+     * @param relPath the relative path
      * @return
      */
     public FileReference createFileRef(String relPath) {
@@ -74,8 +71,7 @@
     /**
      * Get handles for IO devices
      *
-     * @param ioDevices
-     *            comma separated list of devices
+     * @param ioDevices comma separated list of devices
      * @return
      */
     public static List<IODeviceHandle> getDevices(String[] ioDevices) {
@@ -97,11 +93,13 @@
             throw new HyracksDataException(
                     "Passed path: " + absolutePath + " is not inside the device " + mount.getAbsolutePath());
         }
-        return absolutePath.substring(mount.getAbsolutePath().length());
+        // +1 to remove the leading '/'
+        return absolutePath.substring(mount.getAbsolutePath().length() + 1);
     }
 
     /**
      * determinea if the device contains a file with the passed relative path
+     *
      * @param relPath
      * @return true if it contains, false, otherwise
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 05e6544..fcf8c7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -206,8 +206,7 @@
         List<FileReference> files = new ArrayList<>();
         String[] matchingFiles = root.getFile().list(filter);
         if (matchingFiles != null) {
-            files.addAll(Arrays.stream(matchingFiles).map(pDir -> new FileReference(root.getDeviceHandle(), pDir))
-                    .collect(Collectors.toList()));
+            files.addAll(Arrays.stream(matchingFiles).map(root::getChild).collect(Collectors.toList()));
         }
         return files;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 1733ad4..0e6578f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -115,14 +115,10 @@
         return queueSize;
     }
 
-    public int getIoParallelism() {
+    public int getIOParallelism() {
         return ioParallelism;
     }
 
-    public List<IODeviceHandle> getIoDevices() {
-        return ioDevices;
-    }
-
     public IFileDeviceResolver getDeviceComputer() {
         return deviceComputer;
     }
@@ -529,7 +525,7 @@
     }
 
     @Override
-    public void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException {
+    public void overwrite(FileReference fileRef, byte[] bytes) throws HyracksDataException {
         File file = fileRef.getFile();
         try {
             if (file.exists()) {
@@ -596,9 +592,4 @@
             throw HyracksDataException.create(e);
         }
     }
-
-    @Override
-    public void syncFiles(Set<Integer> activePartitions) throws HyracksDataException {
-        // do nothing
-    }
 }