[MULTIPLE ISSUES][STO] Multiple fixes for cloud caching

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- ASTERIXDB-3424: Disk cache sweeper can interfere
                  with rebalance
- ASTERIXDB-3425: Use allocated storage quota to
                  determine whether or not to persist
                  non-column pages
- Other fixes:
  - Make punchHole() uninterruptible
  - Allow plan reevaluation despite the disk is
    being pressured or not
  - Log the page info when the buffer cache's
    page read operation fails
  - Fix API tests failures (ASTERIXDB-3423)
  - Ignore CloudStorageGCSTest

Change-Id: Ida79a61940ed8944ed9fb44ac7d8815d865b73e6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18361
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
index 547bc8b..b47703a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -55,16 +56,26 @@
         INcApplicationContext applicationContext = (INcApplicationContext) cs.getApplicationContext();
         PersistentLocalResourceRepository lrs =
                 (PersistentLocalResourceRepository) applicationContext.getLocalResourceRepository();
+        IDiskCacheMonitoringService diskService = applicationContext.getDiskCacheService();
 
-        String nodeId = applicationContext.getServiceContext().getNodeId();
-        LOGGER.info("Initializing Node {} with storage partitions: {}", nodeId, storagePartitions);
+        // Pause all disk caching activities
+        diskService.pause();
+        try {
+            String nodeId = applicationContext.getServiceContext().getNodeId();
+            LOGGER.info("Initializing Node {} with storage partitions: {}", nodeId, storagePartitions);
 
-        Checkpoint latestCheckpoint = applicationContext.getTransactionSubsystem().getCheckpointManager().getLatest();
-        IPartitionBootstrapper bootstrapper = applicationContext.getPartitionBootstrapper();
-        bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), metadataNode, metadataPartitionId, cleanup,
-                latestCheckpoint == null);
-        // Report all local resources
-        applicationContext.getDiskCacheService().reportLocalResources(lrs.loadAndGetAllResources());
+            Checkpoint latestCheckpoint =
+                    applicationContext.getTransactionSubsystem().getCheckpointManager().getLatest();
+            IPartitionBootstrapper bootstrapper = applicationContext.getPartitionBootstrapper();
+            bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), metadataNode, metadataPartitionId,
+                    cleanup, latestCheckpoint == null);
+
+            // Report all local resources
+            diskService.reportLocalResources(lrs.loadAndGetAllResources());
+        } finally {
+            // Resume all disk caching activities
+            diskService.resume();
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
index 89a4781..3a03445 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
@@ -36,6 +36,7 @@
 import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.FixMethodOrder;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.MethodSorters;
@@ -54,6 +55,7 @@
  */
 @RunWith(Parameterized.class)
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@Ignore
 public class CloudStorageGCSTest {
 
     private static final Logger LOGGER = LogManager.getLogger();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 92df8f0..93b92bc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -10,7 +10,7 @@
     "active\.suspend\.timeout" : 3600,
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
-    "cloud.profiler.log.interval" : 0,
+    "cloud.profiler.log.interval" : 5,
     "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
@@ -24,6 +24,7 @@
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
     "cloud.storage.sweep.threshold.percentage" : 0.9,
+    "cloud.write.buffer.size" : 8388608,
     "compiler\.arrayindex" : true,
     "compiler.batch.lookup" : true,
     "compiler.cbo" : true,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 4d37887..a5e70a5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -10,7 +10,7 @@
     "active\.suspend\.timeout" : 3600,
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
-    "cloud.profiler.log.interval" : 0,
+    "cloud.profiler.log.interval" : 5,
     "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
@@ -24,6 +24,7 @@
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
     "cloud.storage.sweep.threshold.percentage" : 0.9,
+    "cloud.write.buffer.size" : 8388608,
     "compiler\.arrayindex" : true,
     "compiler.batch.lookup" : true,
     "compiler.cbo" : true,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 3189a3c..cc1db05 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -10,7 +10,7 @@
     "active\.suspend\.timeout" : 3600,
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
-    "cloud.profiler.log.interval" : 0,
+    "cloud.profiler.log.interval" : 5,
     "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
@@ -24,6 +24,7 @@
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
     "cloud.storage.sweep.threshold.percentage" : 0.9,
+    "cloud.write.buffer.size" : 8388608,
     "compiler\.arrayindex" : true,
     "compiler.batch.lookup" : true,
     "compiler.cbo" : true,
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
index d8aecc1..56eaeeb 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 
 public class SelectiveCloudAccessor extends ReplaceableCloudAccessor {
@@ -43,7 +44,14 @@
 
     @Override
     public int doPunchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
-        return puncher.punchHole(fileHandle, offset, length);
+        try {
+            // Ensure that punching a hole cannot be interrupted
+            InvokeUtil.doExUninterruptibly(() -> puncher.punchHole(fileHandle, offset, length));
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+
+        return (int) length;
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
index eb125f3..91de5ae 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
@@ -56,22 +56,20 @@
         return new DebugHolePuncher(cloudIOManager, bufferProvider);
     }
 
-    private static int unsupported(IFileHandle fileHandle, long offset, long length) {
+    private static void unsupported(IFileHandle fileHandle, long offset, long length) {
         throw new UnsupportedOperationException("punchHole is not supported");
     }
 
-    private static int linuxPunchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
+    private static void linuxPunchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
         CloudFileHandle cloudFileHandle = (CloudFileHandle) fileHandle;
         int fileDescriptor = cloudFileHandle.getFileDescriptor();
         int blockSize = cloudFileHandle.getBlockSize();
-        int freedSpace = FileSystemOperationDispatcherUtil.punchHole(fileDescriptor, offset, length, blockSize);
+        FileSystemOperationDispatcherUtil.punchHole(fileDescriptor, offset, length, blockSize);
         try {
             cloudFileHandle.getFileChannel().force(false);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
-
-        return freedSpace;
     }
 
     private static final class DebugHolePuncher implements IHolePuncher {
@@ -84,9 +82,8 @@
         }
 
         @Override
-        public int punchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
+        public void punchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
             ByteBuffer buffer = acquireAndPrepareBuffer(length);
-            int totalWritten = 0;
             try {
                 long remaining = length;
                 long position = offset;
@@ -94,14 +91,11 @@
                     int written = cloudIOManager.localWriter(fileHandle, position, buffer);
                     position += written;
                     remaining -= written;
-                    totalWritten += written;
                     buffer.limit((int) Math.min(remaining, buffer.capacity()));
                 }
             } finally {
                 bufferProvider.recycle(buffer);
             }
-
-            return totalWritten;
         }
 
         private ByteBuffer acquireAndPrepareBuffer(long length) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
index 369c247..dc6981c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
@@ -34,5 +34,5 @@
      * @param offset starting offset
      * @param length length
      */
-    int punchHole(IFileHandle file, long offset, long length) throws HyracksDataException;
+    void punchHole(IFileHandle file, long offset, long length) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
index ba513fa..4564856 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
@@ -48,10 +48,21 @@
 
     @Override
     public void stop() {
+        monitorThread.pause();
         executor.shutdown();
     }
 
     @Override
+    public void pause() {
+        monitorThread.pause();
+    }
+
+    @Override
+    public void resume() {
+        monitorThread.resume();
+    }
+
+    @Override
     public boolean isEnabled() {
         return true;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
index e466758..1444354 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IDiskSpaceMaker;
@@ -49,6 +50,7 @@
     private final ICloudIOManager cloudIOManager;
     private final Sweeper sweeper;
     private final long inactiveTimeThreshold;
+    private final AtomicBoolean paused;
 
     public DiskCacheSweeperThread(ExecutorService executorService, long waitTime,
             CloudDiskResourceCacheLockNotifier resourceManager, ICloudIOManager cloudIOManager, int numOfSweepThreads,
@@ -60,14 +62,30 @@
         this.inactiveTimeThreshold = inactiveTimeThreshold;
         indexes = new ArrayList<>();
         this.cloudIOManager = cloudIOManager;
-        sweeper = new Sweeper(executorService, cloudIOManager, bufferCache, fileInfoMap, numOfSweepThreads,
+        paused = new AtomicBoolean();
+        sweeper = new Sweeper(paused, executorService, cloudIOManager, bufferCache, fileInfoMap, numOfSweepThreads,
                 sweepQueueSize);
     }
 
+    public void pause() {
+        LOGGER.info("Pausing Sweeper threads...");
+        // Synchronize to ensure the thread is not sweeping
+        synchronized (this) {
+            // Set paused to ignore all future sweep requests
+            paused.set(true);
+        }
+        sweeper.waitForRunningRequests();
+        LOGGER.info("Sweeper threads have been paused successfully");
+    }
+
     public void reportLocalResources(Map<Long, LocalResource> localResources) {
         resourceManager.reportLocalResources(localResources);
     }
 
+    public void resume() {
+        paused.set(false);
+    }
+
     @Override
     public void makeSpaceOrThrow(IOException ioException) throws HyracksDataException {
         if (ioException.getMessage().contains("no space")) {
@@ -95,15 +113,23 @@
                     makeSpace();
                     wait(waitTime);
                 } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
+                    LOGGER.warn("DiskCacheSweeperThread interrupted", e);
+                    break;
                 } finally {
                     notifyAll();
                 }
             }
         }
+
+        Thread.currentThread().interrupt();
     }
 
     private void makeSpace() {
+        if (paused.get()) {
+            LOGGER.warn("DiskCacheSweeperThread is paused");
+            return;
+        }
+
         if (physicalDrive.computeAndCheckIsPressured()) {
             boolean shouldSweep;
             resourceManager.getEvictionLock().writeLock().lock();
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
index e4fefa5..0253b65 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
@@ -26,7 +26,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IODeviceHandle;
@@ -40,37 +40,45 @@
 public final class PhysicalDrive implements IPhysicalDrive {
     private static final Logger LOGGER = LogManager.getLogger();
     private final List<FileStore> drivePaths;
-    private final long pressureSize;
-    private final AtomicBoolean pressured;
+    private final DiskSpace diskSpace;
+    private final AtomicLong usedSpace;
 
     public PhysicalDrive(List<IODeviceHandle> deviceHandles, double pressureThreshold, double storagePercentage,
             long pressureDebugSize) throws HyracksDataException {
         drivePaths = getDrivePaths(deviceHandles);
-        pressureSize = getPressureSize(drivePaths, pressureThreshold, storagePercentage, pressureDebugSize);
-        pressured = new AtomicBoolean();
+        diskSpace = getPressureSize(drivePaths, pressureThreshold, storagePercentage, pressureDebugSize);
+        usedSpace = new AtomicLong();
         computeAndCheckIsPressured();
     }
 
     @Override
     public boolean computeAndCheckIsPressured() {
         long usedSpace = getUsedSpace();
-        boolean isPressured = usedSpace > pressureSize;
-        pressured.set(isPressured);
+        long pressureCapacity = diskSpace.getPressureCapacity();
+        boolean isPressured = usedSpace > pressureCapacity;
+        this.usedSpace.set(usedSpace);
 
         if (isPressured) {
             LOGGER.info("Used space: {}, pressureCapacity: {} (isPressured: {})",
-                    StorageUtil.toHumanReadableSize(usedSpace), StorageUtil.toHumanReadableSize(pressureSize), true);
+                    StorageUtil.toHumanReadableSize(usedSpace), StorageUtil.toHumanReadableSize(pressureCapacity),
+                    true);
         } else {
             LOGGER.debug("Used space: {}, pressureCapacity: {} (isPressured: {})",
-                    StorageUtil.toHumanReadableSize(usedSpace), StorageUtil.toHumanReadableSize(pressureSize), false);
+                    StorageUtil.toHumanReadableSize(usedSpace), StorageUtil.toHumanReadableSize(pressureCapacity),
+                    false);
         }
 
         return isPressured;
     }
 
     @Override
+    public boolean isUnpressured() {
+        return usedSpace.get() <= diskSpace.getPressureCapacity();
+    }
+
+    @Override
     public boolean hasSpace() {
-        return !pressured.get();
+        return usedSpace.get() < diskSpace.getPressureCapacity();
     }
 
     private long getUsedSpace() {
@@ -86,8 +94,8 @@
         return totalUsedSpace;
     }
 
-    private static long getPressureSize(List<FileStore> drivePaths, double pressureThreshold, double storagePercentage,
-            long pressureDebugSize) throws HyracksDataException {
+    private static DiskSpace getPressureSize(List<FileStore> drivePaths, double pressureThreshold,
+            double storagePercentage, long pressureDebugSize) throws HyracksDataException {
 
         long totalCapacity = 0;
         long totalUsedSpace = 0;
@@ -106,7 +114,7 @@
                 StorageUtil.toHumanReadableSize(totalCapacity), StorageUtil.toHumanReadableSize(allocatedCapacity),
                 StorageUtil.toHumanReadableSize(pressureCapacity), StorageUtil.toHumanReadableSize(totalUsedSpace));
 
-        return pressureCapacity;
+        return new DiskSpace(allocatedCapacity, pressureCapacity);
     }
 
     private static List<FileStore> getDrivePaths(List<IODeviceHandle> deviceHandles) throws HyracksDataException {
@@ -146,4 +154,22 @@
             throw HyracksDataException.create(e);
         }
     }
+
+    private static class DiskSpace {
+        private final long allocatedCapacity;
+        private final long pressureCapacity;
+
+        private DiskSpace(long allocatedCapacity, long pressureCapacity) {
+            this.allocatedCapacity = allocatedCapacity;
+            this.pressureCapacity = pressureCapacity;
+        }
+
+        public long getAllocatedCapacity() {
+            return allocatedCapacity;
+        }
+
+        public long getPressureCapacity() {
+            return pressureCapacity;
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
index c709cc6..973d9a1 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
@@ -35,16 +35,16 @@
     private final ICloudIOManager cloudIOManager;
     private final BufferCache bufferCache;
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
-    private final AtomicBoolean shutdown;
+    private final AtomicBoolean stop;
     private SweepableIndexUnit indexUnit;
     private BufferedFileHandle handle;
 
     public SweepContext(ICloudIOManager cloudIOManager, BufferCache bufferCache,
-            Map<Integer, BufferedFileHandle> fileInfoMap, AtomicBoolean shutdown) {
+            Map<Integer, BufferedFileHandle> fileInfoMap, AtomicBoolean stop) {
         this.cloudIOManager = cloudIOManager;
         this.bufferCache = bufferCache;
         this.fileInfoMap = fileInfoMap;
-        this.shutdown = shutdown;
+        this.stop = stop;
     }
 
     @Override
@@ -97,6 +97,6 @@
      * @return true if it should stop, false otherwise
      */
     public boolean stopSweeping() {
-        return shutdown.get() || indexUnit.isDropped();
+        return stop.get() || indexUnit.isDropped();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
index 6e12b79..49174bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -37,18 +37,19 @@
 
 public final class Sweeper {
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final SweepRequest POISON = new SweepRequest();
+    private final int queueSize;
     private final BlockingQueue<SweepRequest> requests;
     private final BlockingQueue<SweepRequest> freeRequests;
-    private final AtomicBoolean shutdown;
 
-    public Sweeper(ExecutorService executor, ICloudIOManager cloudIOManager, BufferCache bufferCache,
-            Map<Integer, BufferedFileHandle> fileInfoMap, int numOfSweepThreads, int queueSize) {
+    public Sweeper(AtomicBoolean paused, ExecutorService executor, ICloudIOManager cloudIOManager,
+            BufferCache bufferCache, Map<Integer, BufferedFileHandle> fileInfoMap, int numOfSweepThreads,
+            int queueSize) {
+        this.queueSize = queueSize;
         requests = new ArrayBlockingQueue<>(queueSize);
         freeRequests = new ArrayBlockingQueue<>(queueSize);
-        shutdown = new AtomicBoolean(false);
         for (int i = 0; i < queueSize; i++) {
-            freeRequests.add(new SweepRequest(new SweepContext(cloudIOManager, bufferCache, fileInfoMap, shutdown)));
+            SweepContext context = new SweepContext(cloudIOManager, bufferCache, fileInfoMap, paused);
+            freeRequests.add(new SweepRequest(context));
         }
         for (int i = 0; i < numOfSweepThreads; i++) {
             executor.execute(new SweepThread(requests, freeRequests, i));
@@ -61,12 +62,14 @@
         requests.put(request);
     }
 
-    public void shutdown() {
-        shutdown.set(true);
-        requests.clear();
-        freeRequests.clear();
-        requests.offer(POISON);
-        // TODO wait for threads to terminate
+    public void waitForRunningRequests() {
+        // Wait for all running requests if any
+        while (freeRequests.size() != queueSize) {
+            synchronized (freeRequests) {
+                LOGGER.warn("Waiting for {} running sweep requests", queueSize - freeRequests.size());
+                InvokeUtil.doUninterruptibly(freeRequests::wait);
+            }
+        }
     }
 
     private static class SweepThread implements Runnable {
@@ -88,44 +91,31 @@
                 SweepRequest request = null;
                 try {
                     request = requests.take();
-                    if (isPoison(request)) {
-                        break;
-                    }
                     request.handle();
                 } catch (InterruptedException e) {
-                    LOGGER.warn("Ignoring interrupt. Sweep threads should never be interrupted.");
+                    LOGGER.warn("Sweep thread interrupted");
+                    break;
                 } catch (Throwable t) {
                     LOGGER.error("Sweep failed", t);
                 } finally {
-                    if (request != null && request != POISON) {
+                    if (request != null) {
                         freeRequests.add(request);
                     }
-                }
 
-            }
-        }
-
-        private boolean isPoison(SweepRequest request) {
-            if (request == POISON) {
-                LOGGER.info("Exiting");
-                InvokeUtil.doUninterruptibly(() -> requests.put(POISON));
-                if (Thread.interrupted()) {
-                    LOGGER.error("Ignoring interrupt. Sweep threads should never be interrupted.");
+                    synchronized (freeRequests) {
+                        // Notify if pause() is waiting for all requests to finish
+                        freeRequests.notify();
+                    }
                 }
-                return true;
             }
 
-            return false;
+            Thread.currentThread().interrupt();
         }
     }
 
     private static class SweepRequest {
         private final SweepContext context;
 
-        SweepRequest() {
-            this(null);
-        }
-
         SweepRequest(SweepContext context) {
             this.context = context;
         }
@@ -150,6 +140,7 @@
                  */
                 return;
             }
+
             SweepableIndexUnit indexUnit = context.getIndexUnit();
             indexUnit.startSweeping();
             try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
index 38a1713..7187496 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
@@ -71,7 +71,7 @@
     public IColumnReadContext createReadContext(IColumnProjectionInfo projectionInfo) {
         ColumnProjectorType projectorType = projectionInfo.getProjectorType();
         if (projectorType == ColumnProjectorType.QUERY) {
-            planner.access(projectionInfo, drive.hasSpace());
+            planner.access(projectionInfo);
         } else if (projectorType == ColumnProjectorType.MODIFY) {
             planner.setIndexedColumns(projectionInfo);
             // Requested (and indexed) columns will be persisted if space permits
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index f28b2fd..2679d33 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -114,13 +114,13 @@
              * - The page is not being evicted (cloudOnly)
              * - The page is not planned for eviction (evictable)
              * - The operation is not a merge operation (the component will be deleted anyway)
-             * - The disk has space
+             * - The disk is not pressured
              *
              * Note: 'empty' can be false while 'cloudOnly is true'. We cannot read from disk as the page can be
              * evicted at any moment. In other words, the sweeper told us that it is going to evict this page; hence
              * 'cloudOnly' is true.
              */
-            boolean persist = empty && !cloudOnly && !evictable && operation != MERGE && drive.hasSpace();
+            boolean persist = empty && !cloudOnly && !evictable && operation != MERGE && drive.isUnpressured();
             readFromStream(ioManager, fileHandle, header, cPage, persist);
         } else {
             /*
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
index dcde833..604d6cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
@@ -67,8 +67,8 @@
 
     @Override
     public void startWritingColumn(int columnIndex, boolean overlapping) {
-        if (drive.hasSpace() || indexedColumns.contains(columnIndex)) {
-            // The current column should be persisted locally if free disk space permits
+        if (drive.isUnpressured() || indexedColumns.contains(columnIndex)) {
+            // The current column will be persisted locally if free disk has space (drive.hasSpace() returns true)
             currentContext = DefaultBufferCacheWriteContext.INSTANCE;
         } else if (plan.get(columnIndex)) {
             // This column was planned for eviction, do not persist.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
index 9549f4f..3875812 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
@@ -64,7 +64,6 @@
     private long[] lastAccesses;
 
     private double punchableThreshold;
-    private long lastSweepTs;
     private int numberOfSweptColumns;
     private int numberOfCloudRequests;
 
@@ -107,16 +106,16 @@
         return new IntOpenHashSet(indexedColumns);
     }
 
-    public synchronized void access(IColumnProjectionInfo projectionInfo, boolean hasSpace) {
-        resetPlanIfNeeded(hasSpace);
+    public synchronized void access(IColumnProjectionInfo projectionInfo) {
+        resetPlanIfNeeded();
         long accessTime = clock.getCurrentTime();
         lastAccess = accessTime;
         int numberOfColumns = projectionInfo.getNumberOfProjectedColumns();
         boolean requireCloudAccess = false;
         for (int i = 0; i < numberOfColumns; i++) {
             int columnIndex = projectionInfo.getColumnIndex(i);
+            // columnIndex can be -1 when accessing a non-existing column (i.e., not known by the schema)
             if (columnIndex >= 0) {
-                // columnIndex can be -1 when accessing a non-existing column (i.e., not known by the schema)
                 lastAccesses[columnIndex] = accessTime;
                 requireCloudAccess |= numberOfSweptColumns > 0 && plan.get(columnIndex);
             }
@@ -152,8 +151,6 @@
             punchableThreshold *= PUNCHABLE_THRESHOLD_DECREMENT;
             iter++;
         }
-        // Register the plan time
-        lastSweepTs = clock.getCurrentTime();
         // Add the number of evictable columns
         numberOfSweptColumns += numberOfEvictableColumns;
         if (numberOfEvictableColumns > 0) {
@@ -237,8 +234,8 @@
         return numberOfEvictableColumns;
     }
 
-    private void resetPlanIfNeeded(boolean hasSpace) {
-        if (!hasSpace || numberOfCloudRequests < REEVALUATE_PLAN_THRESHOLD) {
+    private void resetPlanIfNeeded() {
+        if (numberOfCloudRequests < REEVALUATE_PLAN_THRESHOLD) {
             return;
         }
 
@@ -249,10 +246,10 @@
             int columnIndex = reevaluatedPlan.nextSetBit(i);
             if (!plan.get(columnIndex)) {
                 // the plan contains a stale column. Invalidate!
+                LOGGER.info("Re-planning to evict {} columns. Old plan: {} new plan: {}", numberOfEvictableColumns,
+                        plan, reevaluatedPlan);
                 plan.clear();
                 plan.or(reevaluatedPlan);
-                LOGGER.info("Re-planning to evict {} columns. The newly evictable columns are {}",
-                        numberOfEvictableColumns, plan);
                 break;
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
index 6bace98..a5a87c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
@@ -56,7 +56,7 @@
         // Project 3 columns
         projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, projectedColumns);
         // access the projected columns (max 10 times)
-        access(planner, info, true, 10);
+        access(planner, info, 10);
 
         // Advance clock
         clock.advance(10);
@@ -69,7 +69,7 @@
         // Project another 3 columns
         projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, projectedColumns);
         // access the projected columns
-        access(planner, info, true, 100);
+        access(planner, info, 100);
 
         // At this point, the plan should change
         BitSet newKeptColumns = new BitSet();
@@ -89,10 +89,10 @@
         System.out.println("Kept columns: " + keptColumns);
     }
 
-    private void access(ColumnSweepPlanner planner, DummyColumnProjectionInfo info, boolean hasSpace, int bound) {
+    private void access(ColumnSweepPlanner planner, DummyColumnProjectionInfo info, int bound) {
         int numberOfAccesses = RANDOM.nextInt(1, bound);
         for (int i = 0; i < numberOfAccesses; i++) {
-            planner.access(info, hasSpace);
+            planner.access(info);
             clock.advance(1);
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 271afad..d2a6fc0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -567,7 +567,13 @@
     private void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException {
         BufferedFileHandle fInfo = getFileHandle(cPage);
         cPage.buffer.clear();
-        fInfo.read(cPage, context);
+        try {
+            fInfo.read(cPage, context);
+        } catch (Throwable e) {
+            LOGGER.error("Error while reading a page {} in file {}", cPage, fInfo);
+            throw e;
+        }
+
         final IThreadStats threadStats = statsSubscribers.get(Thread.currentThread());
         if (threadStats != null && context.incrementStats()) {
             threadStats.coldRead();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java
index af4162b..ad9a940 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java
@@ -30,6 +30,11 @@
     }
 
     @Override
+    public boolean isUnpressured() {
+        return false;
+    }
+
+    @Override
     public boolean hasSpace() {
         return true;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
index b957782..4151c77 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
@@ -37,6 +37,16 @@
     void stop();
 
     /**
+     * Pause all disk caching activities
+     */
+    void pause();
+
+    /**
+     * Resume all disk caching activities
+     */
+    void resume();
+
+    /**
      * @return whether the monitoring service is enabled
      */
     boolean isEnabled();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java
index d2aaec1..6818140 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java
@@ -33,7 +33,13 @@
     boolean computeAndCheckIsPressured();
 
     /**
-     * @return if drive is still has free space (i.e., not pressured)
+     * @return true if the drive is not pressured, false otherwise
+     */
+    boolean isUnpressured();
+
+    /**
+     * @return true if the disk has free space, false otherwise
+     * Note: disk could be pressured but still has free space for storage
      */
     boolean hasSpace();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
index d6155a4..5cbf5cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
@@ -39,6 +39,16 @@
     }
 
     @Override
+    public void pause() {
+        // NoOp
+    }
+
+    @Override
+    public void resume() {
+        // NoOp
+    }
+
+    @Override
     public boolean isEnabled() {
         return false;
     }