[ASTERIXDB-3387][STO] Enable Selective caching policy

- user model changes: yes
- storage format changes: no
- interface changes: no

Details:
Allow user to enable/select Selective caching policy

Change-Id: Ia212db8c1673f6b725eb21fe7035a1e20490d1f6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18291
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index cd52b9d..1d6a0d8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -33,7 +33,7 @@
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.CloudConfigurator;
 import org.apache.asterix.cloud.LocalPartitionBootstrapper;
 import org.apache.asterix.common.api.IConfigValidator;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
@@ -122,6 +122,10 @@
 import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
 import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 import org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.storage.common.file.FileMapManager;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
@@ -180,6 +184,7 @@
     private IPartitionBootstrapper partitionBootstrapper;
     private final INamespacePathResolver namespacePathResolver;
     private final INamespaceResolver namespaceResolver;
+    private IDiskCacheMonitoringService diskCacheService;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, NCExtensionManager extensionManager,
             IPropertiesFactory propertiesFactory, INamespaceResolver namespaceResolver,
@@ -211,16 +216,26 @@
             IConfigValidatorFactory configValidatorFactory, IReplicationStrategyFactory replicationStrategyFactory,
             boolean initialRun) throws IOException {
         ioManager = getServiceContext().getIoManager();
-        IDiskCachedPageAllocator pageAllocator = DefaultDiskCachedPageAllocator.INSTANCE;
-        IBufferCacheReadContext defaultContext = DefaultBufferCacheReadContextProvider.DEFAULT;
+        CloudConfigurator cloudConfigurator;
+        IDiskResourceCacheLockNotifier lockNotifier;
+        IDiskCachedPageAllocator pageAllocator;
+        IBufferCacheReadContext defaultContext;
         if (isCloudDeployment()) {
-            persistenceIOManager =
-                    CloudManagerProvider.createIOManager(cloudProperties, ioManager, namespacePathResolver);
-            partitionBootstrapper = CloudManagerProvider.getCloudPartitionBootstrapper(persistenceIOManager);
+            cloudConfigurator = CloudConfigurator.of(cloudProperties, ioManager, namespacePathResolver);
+            persistenceIOManager = cloudConfigurator.getCloudIoManager();
+            partitionBootstrapper = cloudConfigurator.getPartitionBootstrapper();
+            lockNotifier = cloudConfigurator.getLockNotifier();
+            pageAllocator = cloudConfigurator.getPageAllocator();
+            defaultContext = cloudConfigurator.getDefaultContext();
         } else {
+            cloudConfigurator = null;
             persistenceIOManager = ioManager;
             partitionBootstrapper = new LocalPartitionBootstrapper(ioManager);
+            lockNotifier = NoOpDiskResourceCacheLockNotifier.INSTANCE;
+            pageAllocator = DefaultDiskCachedPageAllocator.INSTANCE;
+            defaultContext = DefaultBufferCacheReadContextProvider.DEFAULT;
         }
+
         int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
         threadExecutor =
                 MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
@@ -261,9 +276,8 @@
         // Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after
         // the metadata bootstrap task
         ((ILifeCycleComponent) virtualBufferCache).start();
-        datasetLifecycleManager =
-                new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
-                        virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
+        datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
+                txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, lockNotifier);
         localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager);
         final String nodeId = getServiceContext().getNodeId();
         final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId);
@@ -300,6 +314,15 @@
                     fileInfoMap, defaultContext);
         }
 
+        if (cloudConfigurator != null) {
+            diskCacheService =
+                    cloudConfigurator.createDiskCacheMonitoringService(getServiceContext(), bufferCache, fileInfoMap);
+        } else {
+            diskCacheService = NoOpDiskCacheMonitoringService.INSTANCE;
+        }
+
+        diskCacheService.start();
+
         NodeControllerService ncs = (NodeControllerService) getServiceContext().getControllerService();
         FileReference appDir =
                 ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
@@ -351,6 +374,7 @@
     @Override
     public synchronized void preStop() throws Exception {
         activeManager.shutdown();
+        diskCacheService.stop();
         if (metadataNodeStub != null) {
             unexportMetadataNodeStub();
         }
@@ -695,6 +719,11 @@
     }
 
     @Override
+    public IDiskCacheMonitoringService getDiskCacheService() {
+        return diskCacheService;
+    }
+
+    @Override
     public boolean isCloudDeployment() {
         return ncServiceContext.getAppConfig().getBoolean(CLOUD_DEPLOYMENT);
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
index 6b3257d..547bc8b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
@@ -63,6 +63,8 @@
         IPartitionBootstrapper bootstrapper = applicationContext.getPartitionBootstrapper();
         bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), metadataNode, metadataPartitionId, cleanup,
                 latestCheckpoint == null);
+        // Report all local resources
+        applicationContext.getDiskCacheService().reportLocalResources(lrs.loadAndGetAllResources());
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index ba3ffcd..cb8b8e5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -61,7 +61,7 @@
 import org.apache.asterix.app.io.PersistedResourceRegistry;
 import org.apache.asterix.app.replication.NcLifecycleCoordinator;
 import org.apache.asterix.app.result.JobResultCallback;
-import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.CloudConfigurator;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
 import org.apache.asterix.common.api.INamespacePathResolver;
@@ -186,8 +186,7 @@
         CloudProperties cloudProperties = null;
         if (cloudDeployment) {
             cloudProperties = new CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
-            ioManager =
-                    (IOManager) CloudManagerProvider.createIOManager(cloudProperties, ioManager, namespacePathResolver);
+            ioManager = CloudConfigurator.createIOManager(ioManager, cloudProperties, namespacePathResolver);
         }
         IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
         appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator, Receptionist::new,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 6888acc..4c06994 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -11,13 +11,19 @@
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
     "cloud.profiler.log.interval" : 0,
+    "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
     "cloud.storage.cache.policy" : "lazy",
+    "cloud.storage.debug.mode.enabled" : false,
+    "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+    "cloud.storage.disk.monitor.interval" : 60,
     "cloud.storage.endpoint" : "",
+    "cloud.storage.index.inactive.duration.threshold" : 6,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
+    "cloud.storage.sweep.threshold.percentage" : 0.9,
     "compiler\.arrayindex" : true,
     "compiler.batch.lookup" : true,
     "compiler.cbo" : true,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 31daf99..1d64ade 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -11,13 +11,19 @@
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
     "cloud.profiler.log.interval" : 0,
+    "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
     "cloud.storage.cache.policy" : "lazy",
+    "cloud.storage.debug.mode.enabled" : false,
+    "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+    "cloud.storage.disk.monitor.interval" : 60,
     "cloud.storage.endpoint" : "",
+    "cloud.storage.index.inactive.duration.threshold" : 6,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
+    "cloud.storage.sweep.threshold.percentage" : 0.9,
     "compiler\.arrayindex" : true,
     "compiler.batch.lookup" : true,
     "compiler.cbo" : true,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index a47b3be..aff2c77 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -11,13 +11,19 @@
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
     "cloud.profiler.log.interval" : 0,
+    "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
     "cloud.storage.cache.policy" : "lazy",
+    "cloud.storage.debug.mode.enabled" : false,
+    "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+    "cloud.storage.disk.monitor.interval" : 60,
     "cloud.storage.endpoint" : "",
+    "cloud.storage.index.inactive.duration.threshold" : 6,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
+    "cloud.storage.sweep.threshold.percentage" : 0.9,
     "compiler\.arrayindex" : true,
     "compiler.batch.lookup" : true,
     "compiler.cbo" : true,
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
new file mode 100644
index 0000000..a0c1fbb
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.asterix.common.api.INamespacePathResolver;
+import org.apache.asterix.common.cloud.CloudCachePolicy;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext;
+import org.apache.hyracks.cloud.buffercache.page.CloudDiskCachedPageAllocator;
+import org.apache.hyracks.cloud.cache.service.CloudDiskCacheMonitoringAndPrefetchingService;
+import org.apache.hyracks.cloud.cache.service.CloudDiskResourceCacheLockNotifier;
+import org.apache.hyracks.cloud.cache.service.DiskCacheSweeperThread;
+import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.DummyPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public final class CloudConfigurator {
+    private final CloudProperties cloudProperties;
+    private final IOManager localIoManager;
+    private final AbstractCloudIOManager cloudIOManager;
+    private final IPhysicalDrive physicalDrive;
+    private final IDiskResourceCacheLockNotifier lockNotifier;
+    private final IDiskCachedPageAllocator pageAllocator;
+    private final IBufferCacheReadContext defaultContext;
+    private final boolean diskCacheManagerRequired;
+    private final long diskCacheMonitoringInterval;
+
+    private CloudConfigurator(CloudProperties cloudProperties, IIOManager ioManager,
+            INamespacePathResolver nsPathResolver) throws HyracksDataException {
+        this.cloudProperties = cloudProperties;
+        localIoManager = (IOManager) ioManager;
+        diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == CloudCachePolicy.SELECTIVE;
+        cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver);
+        physicalDrive = createPhysicalDrive(diskCacheManagerRequired, cloudProperties, ioManager);
+        lockNotifier = createLockNotifier(diskCacheManagerRequired);
+        pageAllocator = createPageAllocator(diskCacheManagerRequired);
+        defaultContext = createDefaultBufferCachePageOpContext(diskCacheManagerRequired, physicalDrive);
+        diskCacheMonitoringInterval = cloudProperties.getStorageDiskMonitorInterval();
+    }
+
+    public IPartitionBootstrapper getPartitionBootstrapper() {
+        return cloudIOManager;
+    }
+
+    public IIOManager getCloudIoManager() {
+        return cloudIOManager;
+    }
+
+    public IDiskResourceCacheLockNotifier getLockNotifier() {
+        return lockNotifier;
+    }
+
+    public IDiskCachedPageAllocator getPageAllocator() {
+        return pageAllocator;
+    }
+
+    public IBufferCacheReadContext getDefaultContext() {
+        return defaultContext;
+    }
+
+    public IDiskCacheMonitoringService createDiskCacheMonitoringService(INCServiceContext serviceContext,
+            IBufferCache bufferCache, Map<Integer, BufferedFileHandle> fileInfoMap) {
+        if (!diskCacheManagerRequired) {
+            return NoOpDiskCacheMonitoringService.INSTANCE;
+        }
+
+        CloudDiskResourceCacheLockNotifier resourceCacheManager = (CloudDiskResourceCacheLockNotifier) lockNotifier;
+        BufferCache diskBufferCache = (BufferCache) bufferCache;
+        int numOfIoDevices = localIoManager.getIODevices().size();
+        IApplicationConfig appConfig = serviceContext.getAppConfig();
+        int ioParallelism = appConfig.getInt(NCConfig.Option.IO_WORKERS_PER_PARTITION);
+        int sweepQueueSize = appConfig.getInt(NCConfig.Option.IO_QUEUE_SIZE);
+        int numOfSweepThreads = ioParallelism * numOfIoDevices;
+        // Ensure at least each sweep thread has one entry in the queue
+        int maxSweepQueueSize = Math.max(numOfSweepThreads, sweepQueueSize);
+        long inactiveThreshold = cloudProperties.getStorageIndexInactiveDurationThreshold();
+        // +1 for the monitorThread
+        ExecutorService executor = Executors.newFixedThreadPool(numOfSweepThreads + 1);
+        DiskCacheSweeperThread monitorThread = new DiskCacheSweeperThread(executor, diskCacheMonitoringInterval,
+                resourceCacheManager, cloudIOManager, numOfSweepThreads, maxSweepQueueSize, physicalDrive,
+                diskBufferCache, fileInfoMap, inactiveThreshold);
+
+        IDiskCacheMonitoringService diskCacheService =
+                new CloudDiskCacheMonitoringAndPrefetchingService(executor, physicalDrive, monitorThread);
+        localIoManager.setSpaceMaker(monitorThread);
+        return diskCacheService;
+    }
+
+    public static CloudConfigurator of(CloudProperties cloudProperties, IIOManager ioManager,
+            INamespacePathResolver nsPathResolver) throws HyracksDataException {
+        return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver);
+    }
+
+    public static AbstractCloudIOManager createIOManager(IIOManager ioManager, CloudProperties cloudProperties,
+            INamespacePathResolver nsPathResolver) throws HyracksDataException {
+        IOManager localIoManager = (IOManager) ioManager;
+        CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
+        if (policy == CloudCachePolicy.EAGER) {
+            return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver);
+        }
+
+        boolean selective = policy == CloudCachePolicy.SELECTIVE;
+        return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective);
+    }
+
+    private static IPhysicalDrive createPhysicalDrive(boolean diskCacheManagerRequired, CloudProperties cloudProperties,
+            IIOManager ioManager) throws HyracksDataException {
+        if (diskCacheManagerRequired) {
+            double storagePercentage = cloudProperties.getStorageAllocationPercentage();
+            double pressureThreshold = cloudProperties.getStorageSweepThresholdPercentage();
+            long pressureDebugSize = cloudProperties.getStorageDebugSweepThresholdSize();
+            return new PhysicalDrive(ioManager.getIODevices(), pressureThreshold, storagePercentage, pressureDebugSize);
+        }
+
+        return DummyPhysicalDrive.INSTANCE;
+    }
+
+    private static IDiskResourceCacheLockNotifier createLockNotifier(boolean diskCacheManagerRequired) {
+        if (diskCacheManagerRequired) {
+            return new CloudDiskResourceCacheLockNotifier(StorageConstants.METADATA_PARTITION);
+        }
+
+        return NoOpDiskResourceCacheLockNotifier.INSTANCE;
+    }
+
+    private static IDiskCachedPageAllocator createPageAllocator(boolean diskCacheManagerRequired) {
+        if (diskCacheManagerRequired) {
+            return CloudDiskCachedPageAllocator.INSTANCE;
+        }
+        return DefaultDiskCachedPageAllocator.INSTANCE;
+    }
+
+    private static IBufferCacheReadContext createDefaultBufferCachePageOpContext(boolean diskCacheManagerRequired,
+            IPhysicalDrive drive) {
+        if (diskCacheManagerRequired) {
+            return new DefaultCloudReadContext(drive);
+        }
+
+        return DefaultBufferCacheReadContextProvider.DEFAULT;
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 04979e6..c993cd3 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -102,7 +102,7 @@
     }
 
     @Override
-    public void evict(FileReference directory) {
+    public void evict(String resourcePath) {
         throw new UnsupportedOperationException("evict is not supported with Eager caching");
     }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 612237a..afe0878 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -68,11 +68,11 @@
     private ILazyAccessor accessor;
 
     public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
-            INamespacePathResolver nsPathResolver, boolean replaceableAccessor) throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, boolean selective) throws HyracksDataException {
         super(ioManager, cloudProperties, nsPathResolver);
         accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager);
         puncher = HolePuncherProvider.get(this, cloudProperties, writeBufferProvider);
-        if (replaceableAccessor) {
+        if (selective) {
             replacer = InitialCloudAccessor.NO_OP_REPLACER;
         } else {
             replacer = () -> {
@@ -208,8 +208,8 @@
     }
 
     @Override
-    public void evict(FileReference directory) throws HyracksDataException {
-        accessor.doEvict(directory);
+    public void evict(String resourcePath) throws HyracksDataException {
+        accessor.doEvict(resolve(resourcePath));
     }
 
     private List<FileReference> resolve(Set<CloudFile> cloudFiles) throws HyracksDataException {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index 7539aa7..a3079e6 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -180,8 +180,8 @@
     @Override
     public synchronized void add(Collection<FileReference> files) {
         LOGGER.info("Uncache {}", files);
+        // We only can 'uncache' data files
         uncachedDataFiles.putAll(getFiles(files, DATA_FILTER));
-        uncachedMetadataFiles.putAll(getFiles(files, METADATA_FILTER));
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
index 934468a..218015d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud.lazy.accessor;
 
+import static org.apache.asterix.cloud.util.CloudFileUtil.DATA_FILTER;
+
 import java.util.Collection;
 import java.util.Set;
 
@@ -50,10 +52,21 @@
             throw new IllegalStateException(directory + " is not a directory");
         }
 
-        // TODO only delete data files?
-        Collection<FileReference> uncachedFiles = UncachedFileReference.toUncached(localIoManager.list(directory));
+        // Get a list of all data files
+        Collection<FileReference> files = localIoManager.list(directory, DATA_FILTER);
+        if (files.isEmpty()) {
+            // Nothing to evict
+            return;
+        }
+
+        // Convert file references to uncached ones
+        Collection<FileReference> uncachedFiles = UncachedFileReference.toUncached(files);
+        // Add all data files to the cacher to indicate they are in a 'cacheable' state (i.e., not downloaded)
         cacher.add(uncachedFiles);
-        localIoManager.delete(directory);
+        // Delete all data files from the local drive
+        for (FileReference uncachedFile : uncachedFiles) {
+            localIoManager.delete(uncachedFile);
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
index 4ea6c46..406f2f3 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
@@ -18,17 +18,21 @@
  */
 package org.apache.asterix.cloud.lazy.filesystem;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.cloud.AbstractCloudIOManager;
+import org.apache.asterix.cloud.CloudFileHandle;
 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.common.cloud.CloudCachePolicy;
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.filesystem.FileSystemOperationDispatcherUtil;
 
 public final class HolePuncherProvider {
     private static final IHolePuncher UNSUPPORTED = HolePuncherProvider::unsupported;
+    private static final IHolePuncher LINUX = HolePuncherProvider::linuxPunchHole;
 
     private HolePuncherProvider() {
     }
@@ -39,13 +43,35 @@
             return UNSUPPORTED;
         }
 
-        return new DebugHolePuncher(cloudIOManager, bufferProvider);
+        if (FileSystemOperationDispatcherUtil.isLinux()) {
+            return LINUX;
+        } else if (cloudProperties.isStorageDebugModeEnabled()) {
+            // Running on debug mode on a non-Linux box
+            return new DebugHolePuncher(cloudIOManager, bufferProvider);
+        }
+
+        throw new UnsupportedOperationException(
+                "Hole puncher is not supported using " + FileSystemOperationDispatcherUtil.getOSName());
     }
 
     private static int unsupported(IFileHandle fileHandle, long offset, long length) {
         throw new UnsupportedOperationException("punchHole is not supported");
     }
 
+    private static int linuxPunchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
+        CloudFileHandle cloudFileHandle = (CloudFileHandle) fileHandle;
+        int fileDescriptor = cloudFileHandle.getFileDescriptor();
+        int blockSize = cloudFileHandle.getBlockSize();
+        int freedSpace = FileSystemOperationDispatcherUtil.punchHole(fileDescriptor, offset, length, blockSize);
+        try {
+            cloudFileHandle.getFileChannel().force(false);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+
+        return freedSpace;
+    }
+
     private static final class DebugHolePuncher implements IHolePuncher {
         private final AbstractCloudIOManager cloudIOManager;
         private final IWriteBufferProvider bufferProvider;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8e6becf..888cea1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -43,6 +43,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 import org.apache.hyracks.util.cache.ICacheManager;
 
@@ -152,4 +153,9 @@
      * @return the disk write rate limiter provider
      */
     IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider();
+
+    /**
+     * @return disk cache service
+     */
+    IDiskCacheMonitoringService getDiskCacheService();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 4b7649a8..5c23f5c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -19,8 +19,12 @@
 package org.apache.asterix.common.config;
 
 import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
+import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
+import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static org.apache.hyracks.util.StorageUtil.StorageUnit.GIGABYTE;
 
 import java.util.concurrent.TimeUnit;
 
@@ -28,6 +32,7 @@
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.util.StorageUtil;
 
 public class CloudProperties extends AbstractProperties {
 
@@ -43,6 +48,14 @@
         CLOUD_STORAGE_ENDPOINT(STRING, ""),
         CLOUD_STORAGE_ANONYMOUS_AUTH(BOOLEAN, false),
         CLOUD_STORAGE_CACHE_POLICY(STRING, "lazy"),
+        // 80% of the total disk space
+        CLOUD_STORAGE_ALLOCATION_PERCENTAGE(DOUBLE, 0.8d),
+        // 90% of the allocated space for storage (i.e., 90% of the 80% of the total disk space)
+        CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE(DOUBLE, 0.9d),
+        CLOUD_STORAGE_DISK_MONITOR_INTERVAL(POSITIVE_INTEGER, 60),
+        CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD(POSITIVE_INTEGER, 6),
+        CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, false),
+        CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(1, GIGABYTE)),
         CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 0);
 
         private final IOptionType interpreter;
@@ -63,6 +76,12 @@
                 case CLOUD_STORAGE_ENDPOINT:
                 case CLOUD_STORAGE_ANONYMOUS_AUTH:
                 case CLOUD_STORAGE_CACHE_POLICY:
+                case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+                case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+                case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+                case CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD:
+                case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
+                case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
                 case CLOUD_PROFILER_LOG_INTERVAL:
                     return Section.COMMON;
                 default:
@@ -86,9 +105,36 @@
                 case CLOUD_STORAGE_ANONYMOUS_AUTH:
                     return "Indicates whether or not anonymous auth should be used for the cloud storage";
                 case CLOUD_STORAGE_CACHE_POLICY:
-                    return "The caching policy (either eager or lazy). 'Eager' caching will download all partitions"
-                            + " upon booting, whereas lazy caching will download a file upon request to open it."
+                    return "The caching policy (either eager, lazy or selective). 'eager' caching will download"
+                            + "all partitions upon booting, whereas 'lazy' caching will download a file upon"
+                            + " request to open it. 'selective' caching will act as the 'lazy' policy; however, "
+                            + " it allows to use the local disk(s) as a cache, where pages and indexes can be "
+                            + " cached or evicted according to the pressure imposed on the local disks."
                             + " (default: 'lazy')";
+                case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+                    return "The percentage of the total disk space that should be allocated for data storage when the"
+                            + " 'selective' caching policy is used. The remaining will act as a buffer for "
+                            + " query workspace (i.e., for query operations that require spilling to disk)."
+                            + " (default: 80% of the total disk space)";
+                case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+                    return "The percentage of the used storage space at which the disk sweeper starts freeing space by"
+                            + " punching holes in stored indexes or by evicting them entirely, "
+                            + " when the 'selective' caching policy is used."
+                            + " (default: 90% of the allocated space for storage)";
+                case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+                    return "The disk monitoring interval time (in seconds): determines how often the system"
+                            + " checks for pressure on disk space when using the 'selective' caching policy."
+                            + " (default : 60 seconds)";
+                case CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD:
+                    return "The duration in minutes to consider an index is inactive. (default: 360 or 6 hours)";
+                case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
+                    return "Whether or not the debug mode is enabled when using the 'selective' caching policy."
+                            + "(default: false)";
+                case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
+                    return "For debugging only. Pressure size will be the current used space + the additional bytes"
+                            + " provided by this configuration option instead of using "
+                            + " CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE."
+                            + " (default: 0. I.e., CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE will be used by default)";
                 case CLOUD_PROFILER_LOG_INTERVAL:
                     return "The waiting time (in minutes) to log cloud request statistics (default: 0, which means"
                             + " the profiler is disabled by default). The minimum is 1 minute."
@@ -138,6 +184,31 @@
         return CloudCachePolicy.fromName(accessor.getString(Option.CLOUD_STORAGE_CACHE_POLICY));
     }
 
+    public double getStorageAllocationPercentage() {
+        return accessor.getDouble(Option.CLOUD_STORAGE_ALLOCATION_PERCENTAGE);
+    }
+
+    public double getStorageSweepThresholdPercentage() {
+        return accessor.getDouble(Option.CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE);
+    }
+
+    public int getStorageDiskMonitorInterval() {
+        return accessor.getInt(Option.CLOUD_STORAGE_DISK_MONITOR_INTERVAL);
+    }
+
+    public long getStorageIndexInactiveDurationThreshold() {
+        int minutes = accessor.getInt(Option.CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD);
+        return TimeUnit.MINUTES.toNanos(minutes);
+    }
+
+    public boolean isStorageDebugModeEnabled() {
+        return accessor.getBoolean(Option.CLOUD_STORAGE_DEBUG_MODE_ENABLED);
+    }
+
+    public long getStorageDebugSweepThresholdSize() {
+        return isStorageDebugModeEnabled() ? accessor.getLong(Option.CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE) : 0L;
+    }
+
     public long getProfilerLogInterval() {
         long interval = TimeUnit.MINUTES.toNanos(accessor.getInt(Option.CLOUD_PROFILER_LOG_INTERVAL));
         return interval == 0 ? 0 : Math.max(interval, TimeUnit.MINUTES.toNanos(1));
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 1a45155..07801a9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -59,6 +59,7 @@
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -71,6 +72,7 @@
     private final IVirtualBufferCache vbc;
     private final ILogManager logManager;
     private final LogRecord waitLog;
+    private final IDiskResourceCacheLockNotifier lockNotifier;
     private volatile boolean stopped = false;
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     // all LSM-trees share the same virtual buffer cache list
@@ -78,7 +80,8 @@
 
     public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository,
             ILogManager logManager, IVirtualBufferCache vbc,
-            IIndexCheckpointManagerProvider indexCheckpointManagerProvider, int numPartitions) {
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
+            IDiskResourceCacheLockNotifier lockNotifier) {
         this.logManager = logManager;
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
@@ -89,6 +92,7 @@
             vbcs.add(vbc);
         }
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+        this.lockNotifier = lockNotifier;
         waitLog = new LogRecord();
         waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
         waitLog.computeAndSetLogSize();
@@ -122,6 +126,7 @@
             datasetResource = getDatasetLifecycle(did);
         }
         datasetResource.register(resource, (ILSMIndex) index);
+        lockNotifier.onRegister(resource, index, datasetResource.getIndexInfo(resource.getId()).getPartition());
     }
 
     private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -145,7 +150,6 @@
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
-
         DatasetResource dsr = datasets.get(did);
         IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
 
@@ -153,6 +157,7 @@
             throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
         }
 
+        lockNotifier.onUnregister(resourceID);
         PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
         if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
             if (LOGGER.isErrorEnabled()) {
@@ -190,6 +195,9 @@
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
 
+        // Notify first before opening a resource
+        lockNotifier.onOpen(resourceID);
+
         DatasetResource dsr = datasets.get(did);
         DatasetInfo dsInfo = dsr.getDatasetInfo();
         if (dsInfo == null || !dsInfo.isRegistered()) {
@@ -253,6 +261,7 @@
             if (iInfo == null) {
                 throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
             }
+            lockNotifier.onClose(resourceID);
         } finally {
             // Regardless of what exception is thrown in the try-block (e.g., line 279),
             // we have to un-touch the index and dataset.
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
index 04f594e..620417b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -45,32 +46,32 @@
 
     @Override
     public ILSMIOOperationScheduler getIoScheduler(INCServiceContext ctx) {
-        return ((INcApplicationContext) ctx.getApplicationContext()).getLSMIOScheduler();
+        return getAppCtx(ctx).getLSMIOScheduler();
     }
 
     @Override
     public IIOManager getIoManager(INCServiceContext ctx) {
-        return ((INcApplicationContext) ctx.getApplicationContext()).getPersistenceIoManager();
+        return getAppCtx(ctx).getPersistenceIoManager();
     }
 
     @Override
     public IBufferCache getBufferCache(INCServiceContext ctx) {
-        return ((INcApplicationContext) ctx.getApplicationContext()).getBufferCache();
+        return getAppCtx(ctx).getBufferCache();
     }
 
     @Override
     public ILocalResourceRepository getLocalResourceRepository(INCServiceContext ctx) {
-        return ((INcApplicationContext) ctx.getApplicationContext()).getLocalResourceRepository();
+        return getAppCtx(ctx).getLocalResourceRepository();
     }
 
     @Override
     public IDatasetLifecycleManager getLifecycleManager(INCServiceContext ctx) {
-        return ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
+        return getAppCtx(ctx).getDatasetLifecycleManager();
     }
 
     @Override
     public IResourceIdFactory getResourceIdFactory(INCServiceContext ctx) {
-        return ((INcApplicationContext) ctx.getApplicationContext()).getResourceIdFactory();
+        return getAppCtx(ctx).getResourceIdFactory();
     }
 
     @Override
@@ -78,6 +79,16 @@
         return registry.getClassIdentifier(getClass(), serialVersionUID);
     }
 
+    @Override
+    public IDiskCacheMonitoringService getDiskCacheMonitoringService(INCServiceContext ctx) {
+        return getAppCtx(ctx).getDiskCacheService();
+
+    }
+
+    private INcApplicationContext getAppCtx(INCServiceContext ctx) {
+        return ((INcApplicationContext) ctx.getApplicationContext());
+    }
+
     @SuppressWarnings("squid:S1172") // unused parameter
     public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
         return RUNTIME_PROVIDER;
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
index 381d5eb..ba513fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hyracks.cloud.cache.service;
 
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 import org.apache.hyracks.storage.common.disk.prefetch.AbstractPrefetchRequest;
@@ -55,6 +57,11 @@
     }
 
     @Override
+    public void reportLocalResources(Map<Long, LocalResource> localResources) {
+        monitorThread.reportLocalResources(localResources);
+    }
+
+    @Override
     public IPhysicalDrive getPhysicalDrive() {
         return drive;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
index 55bab43..036a812 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
@@ -18,67 +18,151 @@
  */
 package org.apache.hyracks.cloud.cache.service;
 
-import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hyracks.cloud.cache.unit.AbstractIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 
-// TODO locking should be revised
 public final class CloudDiskResourceCacheLockNotifier implements IDiskResourceCacheLockNotifier {
-    private final Int2ObjectMap<DatasetUnit> datasets;
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final int metadataPartition;
+    private final Long2ObjectMap<LocalResource> inactiveResources;
+    private final Long2ObjectMap<UnsweepableIndexUnit> unsweepableIndexes;
+    private final Long2ObjectMap<SweepableIndexUnit> sweepableIndexes;
+    private final ReentrantReadWriteLock evictionLock;
 
-    public CloudDiskResourceCacheLockNotifier() {
-        datasets = Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>());
+    public CloudDiskResourceCacheLockNotifier(int metadataPartition) {
+        this.metadataPartition = metadataPartition;
+        inactiveResources = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
+        unsweepableIndexes = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
+        sweepableIndexes = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
+        evictionLock = new ReentrantReadWriteLock();
     }
 
     @Override
-    public void onRegister(int datasetId, LocalResource localResource, IIndex index) {
+    public void onRegister(LocalResource localResource, IIndex index, int partition) {
         ILSMIndex lsmIndex = (ILSMIndex) index;
-        if (lsmIndex.getDiskCacheManager().isSweepable()) {
-            DatasetUnit datasetUnit = datasets.computeIfAbsent(datasetId, DatasetUnit::new);
-            datasetUnit.addIndex(localResource.getId(), lsmIndex);
-        }
-    }
-
-    @Override
-    public void onUnregister(int datasetId, long resourceId) {
-        DatasetUnit datasetUnit = datasets.get(datasetId);
-        if (datasetUnit != null && datasetUnit.dropIndex(resourceId)) {
-            datasets.remove(datasetId);
-
-            // TODO invalidate eviction plans if the disk is not pressured
-        }
-    }
-
-    @Override
-    public void onOpen(int datasetId, long resourceId) {
-        DatasetUnit datasetUnit = datasets.get(datasetId);
-        if (datasetUnit != null) {
-            IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
-            if (indexUnit != null) {
-                indexUnit.readLock();
+        evictionLock.readLock().lock();
+        try {
+            if (partition != metadataPartition) {
+                long resourceId = localResource.getId();
+                if (lsmIndex.getDiskCacheManager().isSweepable()) {
+                    sweepableIndexes.put(resourceId, new SweepableIndexUnit(localResource, lsmIndex));
+                } else {
+                    unsweepableIndexes.put(resourceId, new UnsweepableIndexUnit(localResource));
+                }
             }
+            inactiveResources.remove(localResource.getId());
+        } finally {
+            evictionLock.readLock().unlock();
         }
     }
 
     @Override
-    public void onClose(int datasetId, long resourceId) {
-        DatasetUnit datasetUnit = datasets.get(datasetId);
-        if (datasetUnit != null) {
-            IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
+    public void onUnregister(long resourceId) {
+        evictionLock.readLock().lock();
+        try {
+            AbstractIndexUnit indexUnit = getUnit(resourceId);
             if (indexUnit != null) {
-                indexUnit.readUnlock();
+                indexUnit.drop();
+            } else {
+                inactiveResources.remove(resourceId);
             }
+        } finally {
+            evictionLock.readLock().unlock();
         }
     }
 
-    Int2ObjectMap<DatasetUnit> getDatasets() {
-        return datasets;
+    private AbstractIndexUnit getUnit(long resourceId) {
+        AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId);
+        if (indexUnit == null) {
+            indexUnit = unsweepableIndexes.get(resourceId);
+        }
+        return indexUnit;
+    }
+
+    @Override
+    public void onOpen(long resourceId) {
+        evictionLock.readLock().lock();
+        try {
+            AbstractIndexUnit indexUnit = getUnit(resourceId);
+            if (indexUnit == null) {
+                // Metadata resource
+                return;
+            }
+            indexUnit.open();
+        } finally {
+            evictionLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void onClose(long resourceId) {
+        evictionLock.readLock().lock();
+        try {
+            AbstractIndexUnit indexUnit = getUnit(resourceId);
+            if (indexUnit == null) {
+                // Metadata resource
+                return;
+            }
+            indexUnit.close();
+        } finally {
+            evictionLock.readLock().unlock();
+        }
+
+    }
+
+    ReentrantReadWriteLock getEvictionLock() {
+        return evictionLock;
+    }
+
+    void reportLocalResources(Map<Long, LocalResource> localResources) {
+        inactiveResources.clear();
+        // First check whatever we had already
+        for (LocalResource lr : localResources.values()) {
+            if (unsweepableIndexes.containsKey(lr.getId()) || sweepableIndexes.containsKey(lr.getId())) {
+                // We already have this resource
+                continue;
+            }
+
+            // Probably a new resource or an old resource that wasn't registered before
+            inactiveResources.put(lr.getId(), lr);
+        }
+
+        removeUnassignedResources(unsweepableIndexes, localResources);
+        removeUnassignedResources(sweepableIndexes, localResources);
+
+        LOGGER.info("Retained active {unsweepable: {}, sweepable: {}} and inactive: {}", unsweepableIndexes,
+                sweepableIndexes, inactiveResources.values().stream()
+                        .map(x -> "(id: " + x.getId() + ",  path: " + x.getPath() + ")").toList());
+    }
+
+    private void removeUnassignedResources(Long2ObjectMap<?> indexes, Map<Long, LocalResource> localResources) {
+        indexes.long2ObjectEntrySet().removeIf(x -> !localResources.containsKey(x.getLongKey()));
+    }
+
+    Collection<LocalResource> getInactiveResources() {
+        return inactiveResources.values();
+    }
+
+    Collection<UnsweepableIndexUnit> getUnsweepableIndexes() {
+        return unsweepableIndexes.values();
+    }
+
+    void getSweepableIndexes(Collection<SweepableIndexUnit> indexes) {
+        indexes.addAll(sweepableIndexes.values());
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
index 79b978e..f594e06 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -27,11 +28,12 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IDiskSpaceMaker;
-import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
 import org.apache.hyracks.cloud.sweeper.ISweeper;
 import org.apache.hyracks.cloud.sweeper.Sweeper;
+import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -44,21 +46,29 @@
     private final long waitTime;
     private final CloudDiskResourceCacheLockNotifier resourceManager;
     private final IPhysicalDrive physicalDrive;
-    private final List<IndexUnit> indexes;
+    private final List<SweepableIndexUnit> indexes;
+    private final ICloudIOManager cloudIOManager;
     private final ISweeper sweeper;
+    private final long inactiveTimeThreshold;
 
     public DiskCacheSweeperThread(ExecutorService executorService, long waitTime,
             CloudDiskResourceCacheLockNotifier resourceManager, ICloudIOManager cloudIOManager, int numOfSweepThreads,
             int sweepQueueSize, IPhysicalDrive physicalDrive, BufferCache bufferCache,
-            Map<Integer, BufferedFileHandle> fileInfoMap) {
+            Map<Integer, BufferedFileHandle> fileInfoMap, long inactiveTimeThreshold) {
         this.waitTime = TimeUnit.SECONDS.toMillis(waitTime);
         this.resourceManager = resourceManager;
         this.physicalDrive = physicalDrive;
+        this.inactiveTimeThreshold = inactiveTimeThreshold;
         indexes = new ArrayList<>();
+        this.cloudIOManager = cloudIOManager;
         sweeper = new Sweeper(executorService, cloudIOManager, bufferCache, fileInfoMap, numOfSweepThreads,
                 sweepQueueSize);
     }
 
+    public void reportLocalResources(Map<Long, LocalResource> localResources) {
+        resourceManager.reportLocalResources(localResources);
+    }
+
     @Override
     public void makeSpaceOrThrow(IOException ioException) throws HyracksDataException {
         if (ioException.getMessage().contains("no space")) {
@@ -83,7 +93,7 @@
         while (true) {
             synchronized (this) {
                 try {
-                    sweep();
+                    makeSpace();
                     wait(waitTime);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
@@ -94,20 +104,66 @@
         }
     }
 
-    private void sweep() {
+    private void makeSpace() {
         if (physicalDrive.computeAndCheckIsPressured()) {
-            for (DatasetUnit dataset : resourceManager.getDatasets().values()) {
-                indexes.clear();
-                dataset.getIndexes(indexes);
-                sweepIndexes(sweeper, indexes);
+            boolean shouldSweep;
+            resourceManager.getEvictionLock().writeLock().lock();
+            try {
+                shouldSweep = evictInactive();
+            } finally {
+                resourceManager.getEvictionLock().writeLock().unlock();
+            }
+
+            if (shouldSweep) {
+                // index eviction didn't help. Sweep!
+                sweep();
             }
         }
     }
 
+    private boolean evictInactive() {
+        long now = System.nanoTime();
+        Collection<LocalResource> inactiveResources = resourceManager.getInactiveResources();
+        Collection<UnsweepableIndexUnit> unsweepableIndexes = resourceManager.getUnsweepableIndexes();
+        if (inactiveResources.isEmpty() && unsweepableIndexes.isEmpty()) {
+            // return true to run sweep as nothing will be evicted
+            return true;
+        }
+
+        // First evict all resources that were never been registered
+        for (LocalResource resource : inactiveResources) {
+            try {
+                cloudIOManager.evict(resource.getPath());
+            } catch (HyracksDataException e) {
+                LOGGER.error("Failed to evict resource " + resource.getPath(), e);
+            }
+        }
+
+        // Next evict all inactive indexes
+        for (UnsweepableIndexUnit index : unsweepableIndexes) {
+            if (now - index.getLastAccessTime() >= inactiveTimeThreshold) {
+                try {
+                    cloudIOManager.evict(index.getPath());
+                } catch (HyracksDataException e) {
+                    LOGGER.error("Failed to evict resource " + index.getPath(), e);
+                }
+            }
+        }
+
+        // If disk is still pressured, proceed with sweep
+        return physicalDrive.computeAndCheckIsPressured();
+    }
+
+    private void sweep() {
+        indexes.clear();
+        resourceManager.getSweepableIndexes(indexes);
+        sweepIndexes(sweeper, indexes);
+    }
+
     @CriticalPath
-    private static void sweepIndexes(ISweeper sweeper, List<IndexUnit> indexes) {
+    private static void sweepIndexes(ISweeper sweeper, List<SweepableIndexUnit> indexes) {
         for (int i = 0; i < indexes.size(); i++) {
-            IndexUnit index = indexes.get(i);
+            SweepableIndexUnit index = indexes.get(i);
             if (!index.isSweeping()) {
                 try {
                     sweeper.sweep(index);
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java
new file mode 100644
index 0000000..1532340
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.unit;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
+public abstract class AbstractIndexUnit {
+    protected final LocalResource localResource;
+    private final AtomicLong lastAccessTime;
+    private final AtomicInteger openCounter;
+
+    AbstractIndexUnit(LocalResource localResource) {
+        this.localResource = localResource;
+        this.lastAccessTime = new AtomicLong(0);
+        this.openCounter = new AtomicInteger(0);
+    }
+
+    public final void open() {
+        lastAccessTime.set(System.nanoTime());
+        openCounter.get();
+    }
+
+    public final void close() {
+        openCounter.decrementAndGet();
+    }
+
+    public final long getLastAccessTime() {
+        return lastAccessTime.get();
+    }
+
+    public abstract void drop();
+
+    @Override
+    public String toString() {
+        return "(id: " + localResource.getId() + ", path: " + localResource.getPath() + "sweepable: " + isSweepable()
+                + ")";
+    }
+
+    protected abstract boolean isSweepable();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
deleted file mode 100644
index 34b37fb..0000000
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.cloud.cache.unit;
-
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-
-public final class DatasetUnit {
-    private final int id;
-    private final ReentrantReadWriteLock lock;
-    /**
-     * Maps resourceId to {@link IndexUnit}
-     */
-    private final Long2ObjectMap<IndexUnit> indexes;
-
-    public DatasetUnit(int datasetId) {
-        id = datasetId;
-        lock = new ReentrantReadWriteLock();
-        indexes = new Long2ObjectOpenHashMap<>();
-
-    }
-
-    public int getId() {
-        return id;
-    }
-
-    public IndexUnit addIndex(long resourceId, ILSMIndex index) {
-        writeLock();
-        try {
-            IndexUnit indexUnit = new IndexUnit(resourceId, index);
-            indexes.put(resourceId, indexUnit);
-            return indexUnit;
-        } finally {
-            writeUnlock();
-        }
-    }
-
-    public boolean dropIndex(long resourceId) {
-        IndexUnit indexUnit = indexes.remove(resourceId);
-        // Signal that the index is being dropped so a sweeper thread does not sweep this index or stops sweeping
-        indexUnit.setDropped();
-        // Wait for the sweep operation (if running) before allowing the index to be dropped
-        indexUnit.waitForSweep();
-        return indexUnit.getIndex().isPrimaryIndex();
-    }
-
-    public IndexUnit getIndex(long resourceId) {
-        readLock();
-        try {
-            return indexes.get(resourceId);
-        } finally {
-            readUnlock();
-        }
-    }
-
-    /**
-     * Return the current indexes
-     *
-     * @param indexUnits container used to return the current indexes
-     */
-    public void getIndexes(List<IndexUnit> indexUnits) {
-        readLock();
-        try {
-            indexUnits.addAll(indexes.values());
-        } finally {
-            readUnlock();
-        }
-    }
-
-    private void readLock() {
-        lock.readLock().lock();
-    }
-
-    private void readUnlock() {
-        lock.readLock().unlock();
-    }
-
-    private void writeLock() {
-        lock.writeLock().lock();
-    }
-
-    private void writeUnlock() {
-        lock.writeLock().unlock();
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
similarity index 77%
rename from hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
rename to hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
index 8f8b412..794cafc 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
@@ -19,39 +19,40 @@
 package org.apache.hyracks.cloud.cache.unit;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.LocalResource;
 
-// TODO allow evicting an index entirely
-public final class IndexUnit {
-    private final long id;
+public final class SweepableIndexUnit extends AbstractIndexUnit {
     private final ILSMIndex index;
     private final AtomicBoolean dropped;
     private final AtomicBoolean sweeping;
-    private final AtomicInteger readCounter;
 
-    public IndexUnit(long resourceId, ILSMIndex index) {
-        this.id = resourceId;
+    public SweepableIndexUnit(LocalResource localResource, ILSMIndex index) {
+        super(localResource);
         this.index = index;
         dropped = new AtomicBoolean(false);
         sweeping = new AtomicBoolean(false);
-        readCounter = new AtomicInteger(0);
     }
 
-    public long getId() {
-        return id;
+    @Override
+    public void drop() {
+        // Signal that the index is being dropped so a sweeper thread does not sweep this index or stops sweeping
+        dropped.set(false);
+        // Wait for the sweep operation (if running) before allowing the index to be dropped
+        waitForSweep();
+    }
+
+    @Override
+    protected boolean isSweepable() {
+        return true;
     }
 
     public ILSMIndex getIndex() {
         return index;
     }
 
-    public void setDropped() {
-        dropped.set(false);
-    }
-
     public boolean isDropped() {
         return dropped.get();
     }
@@ -79,13 +80,4 @@
             sweeping.notifyAll();
         }
     }
-
-    public void readLock() {
-        readCounter.incrementAndGet();
-    }
-
-    public void readUnlock() {
-        readCounter.decrementAndGet();
-    }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java
new file mode 100644
index 0000000..204fe68
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.unit;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
+public final class UnsweepableIndexUnit extends AbstractIndexUnit {
+    public UnsweepableIndexUnit(LocalResource localResource) {
+        super(localResource);
+    }
+
+    @Override
+    public void drop() {
+        // NoOp
+    }
+
+    @Override
+    protected boolean isSweepable() {
+        return false;
+    }
+
+    public String getPath() {
+        return localResource.getPath();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 83b7629..0dca417 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -22,7 +22,6 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileHandle;
 
 /**
@@ -86,9 +85,9 @@
     int punchHole(IFileHandle fHandle, long offset, long length) throws HyracksDataException;
 
     /**
-     * Evict a directory from the local disk cache
+     * Evict a resource from the local disk cache
      *
-     * @param directory to evict
+     * @param resourcePath to evict
      */
-    void evict(FileReference directory) throws HyracksDataException;
+    void evict(String resourcePath) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
index d064fda..9067a3f 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.cloud.sweeper;
 
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 
 /**
@@ -31,5 +31,5 @@
      *
      * @param indexUnit to sweep
      */
-    void sweep(IndexUnit indexUnit) throws InterruptedException;
+    void sweep(SweepableIndexUnit indexUnit) throws InterruptedException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
index 0410e3b..ca103ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.cloud.sweeper;
 
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 
 public final class NoOpSweeper implements ISweeper {
     public static final ISweeper INSTANCE = new NoOpSweeper();
@@ -27,7 +27,7 @@
     }
 
     @Override
-    public void sweep(IndexUnit indexUnit) {
+    public void sweep(SweepableIndexUnit indexUnit) {
         // NoOp
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
index 79fdb5a..c709cc6 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
@@ -23,7 +23,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -36,7 +36,7 @@
     private final BufferCache bufferCache;
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
     private final AtomicBoolean shutdown;
-    private IndexUnit indexUnit;
+    private SweepableIndexUnit indexUnit;
     private BufferedFileHandle handle;
 
     public SweepContext(ICloudIOManager cloudIOManager, BufferCache bufferCache,
@@ -74,11 +74,11 @@
         bufferCache.unpin(page, bcOpCtx);
     }
 
-    public void setIndexUnit(IndexUnit indexUnit) {
+    public void setIndexUnit(SweepableIndexUnit indexUnit) {
         this.indexUnit = indexUnit;
     }
 
-    public IndexUnit getIndexUnit() {
+    public SweepableIndexUnit getIndexUnit() {
         return indexUnit;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
index e5d368c..245c957 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -26,7 +26,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.InvokeUtil;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
@@ -56,7 +56,7 @@
     }
 
     @Override
-    public void sweep(IndexUnit indexUnit) throws InterruptedException {
+    public void sweep(SweepableIndexUnit indexUnit) throws InterruptedException {
         SweepRequest request = freeRequests.take();
         request.reset(indexUnit);
         requests.put(request);
@@ -131,7 +131,7 @@
             this.context = context;
         }
 
-        void reset(IndexUnit indexUnit) {
+        void reset(SweepableIndexUnit indexUnit) {
             context.setIndexUnit(indexUnit);
         }
 
@@ -151,7 +151,7 @@
                  */
                 return;
             }
-            IndexUnit indexUnit = context.getIndexUnit();
+            SweepableIndexUnit indexUnit = context.getIndexUnit();
             indexUnit.startSweeping();
             try {
                 ILSMIndex index = indexUnit.getIndex();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index b9f277a..9909e97 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -45,6 +45,7 @@
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOBulkOperation;
@@ -82,6 +83,8 @@
      * Mutables
      */
     private int workspaceIndex;
+    // TODO use space make on write
+    private IDiskSpaceMaker spaceMaker;
 
     public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer, int ioParallelism, int queueSize)
             throws HyracksDataException {
@@ -112,6 +115,7 @@
         for (int i = 0; i < numIoThreads; i++) {
             executor.execute(new IoRequestHandler(i, submittedRequests));
         }
+        spaceMaker = NoOpDiskSpaceMaker.INSTANCE;
     }
 
     public int getQueueSize() {
@@ -596,4 +600,8 @@
     public void performBulkOperation(IIOBulkOperation bulkOperation) throws HyracksDataException {
         ((AbstractBulkOperation) bulkOperation).performOperation();
     }
+
+    public void setSpaceMaker(IDiskSpaceMaker spaceMaker) {
+        this.spaceMaker = spaceMaker;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
new file mode 100644
index 0000000..1527dde
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
+
+final class NoOpDiskSpaceMaker implements IDiskSpaceMaker {
+    static final IDiskSpaceMaker INSTANCE = new NoOpDiskSpaceMaker();
+
+    private NoOpDiskSpaceMaker() {
+    }
+
+    @Override
+    public void makeSpaceOrThrow(IOException e) throws HyracksDataException {
+        throw HyracksDataException.create(e);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
index bf870b1..426e81d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
@@ -26,6 +26,8 @@
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.ResourceIdFactory;
 
 public class BTreeHelperStorageManager implements IStorageManager {
@@ -60,4 +62,9 @@
     public IResourceLifecycleManager<IIndex> getLifecycleManager(INCServiceContext ctx) {
         return RuntimeContext.get(ctx).getIndexLifecycleManager();
     }
+
+    @Override
+    public IDiskCacheMonitoringService getDiskCacheMonitoringService(INCServiceContext ctx) {
+        return NoOpDiskCacheMonitoringService.INSTANCE;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
index 4932bf0..9fc3b8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -27,7 +27,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 import org.apache.hyracks.cloud.sweeper.SweepContext;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
@@ -59,7 +59,7 @@
 
     public long sweep(BitSet plan, SweepContext context, IColumnTupleProjector sweepProjector)
             throws HyracksDataException {
-        IndexUnit indexUnit = context.getIndexUnit();
+        SweepableIndexUnit indexUnit = context.getIndexUnit();
         LSMColumnBTree lsmColumnBTree = (LSMColumnBTree) indexUnit.getIndex();
         IColumnProjectionInfo projectionInfo = captureSweepableComponents(lsmColumnBTree, sweepProjector);
         if (projectionInfo == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
index a0b592a..7ea4e35 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
@@ -45,6 +45,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -87,13 +88,14 @@
         List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
         ioOpCallbackFactory.initialize(serviceCtx, this);
         pageWriteCallbackFactory.initialize(serviceCtx, this);
+        IDiskCacheMonitoringService diskCacheService = storageManager.getDiskCacheMonitoringService(serviceCtx);
         return LSMColumnBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx),
                 typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
                 opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
                 ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, metadataPageManagerFactory, false,
                 serviceCtx.getTracer(), compressorDecompressorFactory, nullTypeTraits, nullIntrospector,
-                columnManagerFactory, atomic);
+                columnManagerFactory, atomic, diskCacheService);
     }
 
     public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json)
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index 039aaed..6335acd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManagerFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.CloudColumnIndexDiskCacheManager;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IColumnIndexDiskCacheManager;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.NoOpColumnIndexDiskCacheManager;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
@@ -55,6 +56,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.util.trace.ITracer;
 
 public class LSMColumnBTreeUtil {
@@ -66,13 +68,18 @@
             ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
             int[] btreeFields, IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer,
             ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
-            INullIntrospector nullIntrospector, IColumnManagerFactory columnManagerFactory, boolean atomic)
-            throws HyracksDataException {
-        // Initialize managers
-        IColumnManager columnManager = columnManagerFactory.createColumnManager();
-        IColumnIndexDiskCacheManager diskCacheManager = NoOpColumnIndexDiskCacheManager.INSTANCE;
+            INullIntrospector nullIntrospector, IColumnManagerFactory columnManagerFactory, boolean atomic,
+            IDiskCacheMonitoringService diskCacheService) throws HyracksDataException {
 
-        //Tuple writers
+        // Initialize managers
+        boolean diskCacheEnabled = diskCacheService.isEnabled();
+        IColumnManager columnManager = columnManagerFactory.createColumnManager();
+        IColumnIndexDiskCacheManager diskCacheManager = diskCacheEnabled
+                ? new CloudColumnIndexDiskCacheManager(columnManager.getNumberOfPrimaryKeys(),
+                        columnManager.getMergeColumnProjector(), diskCacheService.getPhysicalDrive())
+                : NoOpColumnIndexDiskCacheManager.INSTANCE;
+
+        // Tuple writers
         LSMBTreeTupleWriterFactory insertTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
                 cmpFactories.length, false, updateAware, nullTypeTraits, nullIntrospector);
         LSMBTreeTupleWriterFactory deleteTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
@@ -82,7 +89,7 @@
         LSMBTreeTupleWriterFactory bulkLoadTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
                 cmpFactories.length, false, updateAware, nullTypeTraits, nullIntrospector);
 
-        //Leaf frames
+        // Leaf frames
         ITreeIndexFrameFactory flushLeafFrameFactory = new ColumnBTreeLeafFrameFactory(copyTupleWriterFactory,
                 columnManagerFactory.getFlushColumnTupleReaderWriterFactory());
         ITreeIndexFrameFactory mergeLeafFrameFactory = new ColumnBTreeLeafFrameFactory(copyTupleWriterFactory,
@@ -93,7 +100,7 @@
         ITreeIndexFrameFactory deleteLeafFrameFactory = new BTreeNSMLeafFrameFactory(deleteTupleWriterFactory);
         ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(insertTupleWriterFactory);
 
-        //BTree factory
+        // BTree factory
         TreeIndexFactory<ColumnBTree> flushBTreeFactory = new ColumnBTreeFactory(ioManager, diskBufferCache,
                 freePageManagerFactory, interiorFrameFactory, flushLeafFrameFactory, cmpFactories, typeTraits.length);
         TreeIndexFactory<ColumnBTree> mergeBTreeFactory = new ColumnBTreeFactory(ioManager, diskBufferCache,
@@ -102,8 +109,8 @@
                 new ColumnBTreeFactory(ioManager, diskBufferCache, freePageManagerFactory, interiorFrameFactory,
                         bulkLoadLeafFrameFactory, cmpFactories, typeTraits.length);
 
-        ILSMIndexFileManager fileNameManager =
-                new LSMBTreeFileManager(ioManager, file, flushBTreeFactory, true, compressorDecompressorFactory);
+        ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, file, flushBTreeFactory, true,
+                compressorDecompressorFactory, diskCacheEnabled);
 
         BloomFilterFactory bloomFilterFactory = new BloomFilterFactory(diskBufferCache, bloomFilterKeyFields);
         ILSMDiskComponentFactory flushComponentFactory =
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 98b82db..e2e49e3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -45,18 +45,20 @@
             (dir, name) -> !name.startsWith(".") && name.endsWith(BTREE_SUFFIX);
     private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
     private final boolean hasBloomFilter;
-
-    public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
-            TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter,
-            ICompressorDecompressorFactory compressorDecompressorFactory) {
-        super(ioManager, file, null, compressorDecompressorFactory);
-        this.btreeFactory = btreeFactory;
-        this.hasBloomFilter = hasBloomFilter;
-    }
+    private final boolean allowHoles;
 
     public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
             TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter) {
-        this(ioManager, file, btreeFactory, hasBloomFilter, NoOpCompressorDecompressorFactory.INSTANCE);
+        this(ioManager, file, btreeFactory, hasBloomFilter, NoOpCompressorDecompressorFactory.INSTANCE, false);
+    }
+
+    public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
+            TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter,
+            ICompressorDecompressorFactory compressorDecompressorFactory, boolean allowHoles) {
+        super(ioManager, file, null, compressorDecompressorFactory);
+        this.btreeFactory = btreeFactory;
+        this.hasBloomFilter = hasBloomFilter;
+        this.allowHoles = allowHoles;
     }
 
     @Override
@@ -180,4 +182,9 @@
 
         return validFiles;
     }
+
+    @Override
+    protected boolean areHolesAllowed() {
+        return allowHoles;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index 3106b29..6bd236c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -122,7 +122,7 @@
             filterManager = new LSMComponentFilterManager(filterFrameFactory);
         }
         ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, file, diskBTreeFactory,
-                hasBloomFilter, compressorDecompressorFactory);
+                hasBloomFilter, compressorDecompressorFactory, false);
 
         ILSMDiskComponentFactory componentFactory;
         ILSMDiskComponentFactory bulkLoadComponentFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
index 9d0880e..47492e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 /**
@@ -59,4 +60,10 @@
      * @return the resource lifecycle manager
      */
     IResourceLifecycleManager<IIndex> getLifecycleManager(INCServiceContext ctx);
+
+    /**
+     * @param ctx the nc service context
+     * @return disk cache monitoring service
+     */
+    IDiskCacheMonitoringService getDiskCacheMonitoringService(INCServiceContext ctx);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
index 5fb7088..b957782 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
@@ -18,6 +18,10 @@
  */
 package org.apache.hyracks.storage.common.disk;
 
+import java.util.Map;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
 /**
  * Disk cache monitoring service is responsible for monitor the local drives
  */
@@ -38,6 +42,13 @@
     boolean isEnabled();
 
     /**
+     * Report all local resources
+     *
+     * @param localResources local resources
+     */
+    void reportLocalResources(Map<Long, LocalResource> localResources);
+
+    /**
      * @return physical drive
      */
     IPhysicalDrive getPhysicalDrive();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
index fc4645d..1e986b3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
@@ -31,34 +31,31 @@
      * Notify registering a new resource
      * Note: this method is not thread-safe outside {@link org.apache.hyracks.storage.common.IResourceLifecycleManager}
      *
-     * @param datasetId     dataset ID
      * @param localResource resource to be registered
      * @param index         of the resource
+     * @param partition     partition
      */
-    void onRegister(int datasetId, LocalResource localResource, IIndex index);
+    void onRegister(LocalResource localResource, IIndex index, int partition);
 
     /**
      * Notify unregistering an existing resource
      * Note: this method is not thread-safe outside {@link org.apache.hyracks.storage.common.IResourceLifecycleManager}
      *
-     * @param datasetId  dataset ID
      * @param resourceId resource ID
      */
-    void onUnregister(int datasetId, long resourceId);
+    void onUnregister(long resourceId);
 
     /**
      * Notify opening a resource
      *
-     * @param datasetId  dataset ID
      * @param resourceId resource ID
      */
-    void onOpen(int datasetId, long resourceId);
+    void onOpen(long resourceId);
 
     /**
      * Notify closing a resource
      *
-     * @param datasetId  dataset ID
      * @param resourceId resource ID
      */
-    void onClose(int datasetId, long resourceId);
+    void onClose(long resourceId);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
index 9a7a5d6..d6155a4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
@@ -18,6 +18,10 @@
  */
 package org.apache.hyracks.storage.common.disk;
 
+import java.util.Map;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
 public final class NoOpDiskCacheMonitoringService implements IDiskCacheMonitoringService {
     public static final IDiskCacheMonitoringService INSTANCE = new NoOpDiskCacheMonitoringService();
 
@@ -40,6 +44,11 @@
     }
 
     @Override
+    public void reportLocalResources(Map<Long, LocalResource> localResources) {
+        // NoOp
+    }
+
+    @Override
     public IPhysicalDrive getPhysicalDrive() {
         return DummyPhysicalDrive.INSTANCE;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
index 3c0d6c9..b83c388 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
@@ -28,22 +28,22 @@
     }
 
     @Override
-    public void onRegister(int datasetId, LocalResource localResource, IIndex index) {
+    public void onRegister(LocalResource localResource, IIndex index, int partition) {
         // NoOp
     }
 
     @Override
-    public void onUnregister(int datasetId, long resourceId) {
+    public void onUnregister(long resourceId) {
         // NoOp
     }
 
     @Override
-    public void onOpen(int datasetId, long resourceId) {
+    public void onOpen(long resourceId) {
         // NoOp
     }
 
     @Override
-    public void onClose(int datasetId, long resourceId) {
+    public void onClose(long resourceId) {
         // NoOp
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
index e54ce2c..0b4d2ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
@@ -26,6 +26,8 @@
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.ResourceIdFactory;
 
 public class TestStorageManager implements IStorageManager {
@@ -60,4 +62,9 @@
         return TestStorageManagerComponentHolder.getIndexLifecycleManager();
     }
 
+    @Override
+    public IDiskCacheMonitoringService getDiskCacheMonitoringService(INCServiceContext ctx) {
+        return NoOpDiskCacheMonitoringService.INSTANCE;
+    }
+
 }