[ASTERIXDB-3375][STO][HYR] Introduce local disk caching APIs

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Introduce hyracks-cloud and local disk caching APIs

Change-Id: I8a1ce6616d1190f8b047570fac1d774297a3d7de
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18227
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index caf862b..7a4a33b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -112,10 +112,12 @@
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
 import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
 import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator;
 import org.apache.hyracks.storage.common.buffercache.IPageCleanerPolicy;
 import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
 import org.apache.hyracks.storage.common.buffercache.context.page.DefaultBufferCachePageOperationContextProvider;
@@ -209,6 +211,7 @@
             IConfigValidatorFactory configValidatorFactory, IReplicationStrategyFactory replicationStrategyFactory,
             boolean initialRun) throws IOException {
         ioManager = getServiceContext().getIoManager();
+        IDiskCachedPageAllocator pageAllocator = DefaultDiskCachedPageAllocator.INSTANCE;
         IBufferCacheReadContext defaultContext = DefaultBufferCachePageOperationContextProvider.DEFAULT;
         if (isCloudDeployment()) {
             persistenceIOManager =
@@ -221,9 +224,9 @@
         int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
         threadExecutor =
                 MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
-        ICacheMemoryAllocator allocator = new HeapBufferAllocator();
+        ICacheMemoryAllocator bufferAllocator = new HeapBufferAllocator();
         IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
-        IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator,
+        IPageReplacementStrategy prs = new ClockPageReplacementStrategy(bufferAllocator, pageAllocator,
                 storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages());
         lsmIOScheduler = createIoScheduler(storageProperties);
         metadataMergePolicyFactory = new ConcurrentMergePolicyFactory();
@@ -254,7 +257,7 @@
                         maxScheduledFlushes);
             }
         }
-        virtualBufferCache = new GlobalVirtualBufferCache(allocator, storageProperties, maxScheduledFlushes);
+        virtualBufferCache = new GlobalVirtualBufferCache(bufferAllocator, storageProperties, maxScheduledFlushes);
         // Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after
         // the metadata bootstrap task
         ((ILifeCycleComponent) virtualBufferCache).start();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
index c826ed7..3f91c75 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
@@ -28,16 +28,21 @@
  * Only used for files which are stored inside an IO device.
  */
 public class FileReference implements Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final File file;
     private final IODeviceHandle dev;
     private final String path;
     private long registrationTime = 0L;
+    /**
+     * Used to determine if holes can exist in this file
+     */
+    private boolean holesAllowed;
 
     public FileReference(IODeviceHandle dev, String path) {
         file = new File(dev.getMount(), path);
         this.dev = dev;
         this.path = path;
+        holesAllowed = false;
     }
 
     public File getFile() {
@@ -132,4 +137,15 @@
         String parentPath = path.substring(0, parentIndex);
         return new FileReference(dev, parentPath);
     }
+
+    public boolean areHolesAllowed() {
+        return holesAllowed;
+    }
+
+    /**
+     * Set the file to allow holes. Once set, it cannot be changed.
+     */
+    public void setHolesAllowed() {
+        this.holesAllowed = true;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IDiskSpaceMaker.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IDiskSpaceMaker.java
new file mode 100644
index 0000000..b5b912e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IDiskSpaceMaker.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.io;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IDiskSpaceMaker {
+    void makeSpaceOrThrow(IOException e) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/pom.xml b/hyracks-fullstack/hyracks/hyracks-cloud/pom.xml
new file mode 100644
index 0000000..6ff2ef6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/pom.xml
@@ -0,0 +1,91 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-cloud</artifactId>
+  <parent>
+    <groupId>org.apache.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.3.10-SNAPSHOT</version>
+  </parent>
+  <licenses>
+    <license>
+      <name>Apache License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+      <comments>A business-friendly OSS license</comments>
+    </license>
+  </licenses>
+  <properties>
+    <root.dir>${basedir}/../..</root.dir>
+    <jnr-posix.version>3.1.19</jnr-posix.version>
+    <jnr-ffi.version>2.2.16</jnr-ffi.version>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-storage-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-storage-am-lsm-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-control-nc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.jnr</groupId>
+      <artifactId>jnr-posix</artifactId>
+      <version>${jnr-posix.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.github.jnr</groupId>
+      <artifactId>jnr-ffi</artifactId>
+      <version>${jnr-ffi.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/ContextUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/ContextUtil.java
new file mode 100644
index 0000000..dfcd6f1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/ContextUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.context;
+
+import static org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+
+public class ContextUtil {
+    private ContextUtil() {
+    }
+
+    public static boolean isEmpty(BufferCacheHeaderHelper header) {
+        ByteBuffer headerBuf = header.getBuffer();
+        // THIS IS ONLY VALID FOR COLUMNAR
+        return headerBuf.getInt(BufferCacheHeaderHelper.FRAME_MULTIPLIER_OFF) == 0;
+    }
+
+    public static void persist(ICloudIOManager cloudIOManager, IFileHandle fileHandle, ByteBuffer buffer, long offset)
+            throws HyracksDataException {
+        int originalLimit = buffer.limit();
+        buffer.position(RESERVED_HEADER_BYTES);
+
+        // First write the content of the page
+        cloudIOManager.localWriter(fileHandle, offset + RESERVED_HEADER_BYTES, buffer);
+
+        // If a failure happened before this position, we are sure the header still has 0 for
+        // the multiplier (i.e., a hole)
+
+        // Next, write the header. This is like a "commit" for the page
+        buffer.position(0);
+        buffer.limit(RESERVED_HEADER_BYTES);
+        // TODO what if this failed to write fully? (e.g., it wrote the first 3 bytes of the multiplier)
+        cloudIOManager.localWriter(fileHandle, offset, buffer);
+
+        // After this point the header is written. We are sure the page is valid and has the correct multiplier
+
+        // Restore the page's position and limit
+        buffer.position(0);
+        buffer.limit(originalLimit);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
new file mode 100644
index 0000000..c4ae6c0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.context;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheWriteContext;
+
+public final class DefaultCloudOnlyWriteContext implements IBufferCacheWriteContext {
+    public static final IBufferCacheWriteContext INSTANCE = new DefaultCloudOnlyWriteContext();
+
+    private DefaultCloudOnlyWriteContext() {
+    }
+
+    @Override
+    public int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data)
+            throws HyracksDataException {
+        ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+        return cloudIOManager.cloudWrite(handle, offset, data);
+    }
+
+    @Override
+    public long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data)
+            throws HyracksDataException {
+        ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+        return cloudIOManager.cloudWrite(handle, offset, data);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudReadContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudReadContext.java
new file mode 100644
index 0000000..8dfd8ce
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudReadContext.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.context;
+
+import static org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper.EXTRA_BLOCK_PAGE_ID_OFF;
+import static org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper.FRAME_MULTIPLIER_OFF;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+/**
+ * Default context for {@link BufferCache#pin(long, IBufferCacheReadContext)}
+ * and {@link BufferCache#unpin(ICachedPage, IBufferCacheReadContext)} in a cloud deployment.
+ * <p>
+ * The default behavior of this context is persisting the pages read from cloud if {@link IPhysicalDrive}
+ * reports that the local drive(s) are not pressured.
+ */
+@ThreadSafe
+public class DefaultCloudReadContext implements IBufferCacheReadContext {
+    private final IPhysicalDrive drive;
+
+    public DefaultCloudReadContext(IPhysicalDrive drive) {
+        this.drive = drive;
+    }
+
+    @Override
+    public void onPin(ICachedPage page) {
+        // NoOp
+    }
+
+    @Override
+    public void onUnpin(ICachedPage page) {
+        // NoOp
+    }
+
+    @Override
+    public boolean isNewPage() {
+        return false;
+    }
+
+    @Override
+    public boolean incrementStats() {
+        return true;
+    }
+
+    @Override
+    public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage) throws HyracksDataException {
+        return readAndPersistPage(ioManager, fileHandle, header, cPage, drive.hasSpace());
+    }
+
+    public static ByteBuffer readAndPersistPage(IOManager ioManager, BufferedFileHandle fileHandle,
+            BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException {
+        ByteBuffer headerBuf = readAndPersistIfEmpty(ioManager, fileHandle.getFileHandle(), header, cPage, persist);
+
+        cPage.setFrameSizeMultiplier(headerBuf.getInt(FRAME_MULTIPLIER_OFF));
+        cPage.setExtraBlockPageId(headerBuf.getInt(EXTRA_BLOCK_PAGE_ID_OFF));
+        headerBuf.position(BufferCache.RESERVED_HEADER_BYTES);
+        return headerBuf;
+    }
+
+    /**
+     * Note: {@link BufferCache} guarantees that no two reads on the same page can happen at the same time.
+     * This means persisting the page on disk here is guaranteed to be done by a single thread.
+     *
+     * @param ioManager  ioManager (guaranteed to be {@link ICloudIOManager}
+     * @param fileHandle file
+     * @param header     header buffer provider
+     * @param cPage      cached page
+     * @return header buffer
+     */
+    private static ByteBuffer readAndPersistIfEmpty(IOManager ioManager, IFileHandle fileHandle,
+            BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException {
+        ByteBuffer headerBuf = header.getBuffer();
+        if (ContextUtil.isEmpty(header)) {
+            // header indicates the page is empty
+            // reset the buffer position to 0. Limit should be already set before the call of processHeader
+            headerBuf.position(0);
+            long offset = cPage.getCompressedPageOffset();
+            ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+            // Read pageZero from the cloud
+            cloudIOManager.cloudRead(fileHandle, offset, headerBuf);
+            headerBuf.flip();
+
+            if (persist) {
+                ContextUtil.persist(cloudIOManager, fileHandle, headerBuf, offset);
+            }
+        }
+
+        return headerBuf;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudCachedPage.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudCachedPage.java
new file mode 100644
index 0000000..76d937c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudCachedPage.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.page;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+
+public final class CloudCachedPage extends CachedPage {
+    private ISweepLockInfo lockInfo;
+
+    private CloudCachedPage() {
+        // Disable default constructor
+    }
+
+    CloudCachedPage(int cpid, ByteBuffer buffer, IPageReplacementStrategy replacementStrategy) {
+        super(cpid, buffer, replacementStrategy);
+        lockInfo = UnlockedSweepLockInfo.INSTANCE;
+    }
+
+    public ISweepLockInfo beforeRead() {
+        latch.readLock().lock();
+        return lockInfo;
+    }
+
+    public void afterRead() {
+        latch.readLock().unlock();
+    }
+
+    public boolean trySweepLock(ISweepLockInfo lockInfo) {
+        if (!latch.writeLock().tryLock()) {
+            return false;
+        }
+
+        try {
+            this.lockInfo = lockInfo;
+        } finally {
+            latch.writeLock().unlock();
+        }
+
+        return true;
+    }
+
+    public void sweepUnlock() {
+        latch.writeLock().lock();
+        try {
+            this.lockInfo = UnlockedSweepLockInfo.INSTANCE;
+        } finally {
+            latch.writeLock().unlock();
+        }
+    }
+
+    public boolean skipCloudStream() {
+        return valid;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudDiskCachedPageAllocator.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudDiskCachedPageAllocator.java
new file mode 100644
index 0000000..d9c91f3
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudDiskCachedPageAllocator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.page;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator;
+import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+
+public final class CloudDiskCachedPageAllocator implements IDiskCachedPageAllocator {
+    public static final IDiskCachedPageAllocator INSTANCE = new CloudDiskCachedPageAllocator();
+
+    private CloudDiskCachedPageAllocator() {
+    }
+
+    @Override
+    public CachedPage allocate(int cpid, ByteBuffer buffer, IPageReplacementStrategy replacementStrategy) {
+        return new CloudCachedPage(cpid, buffer, replacementStrategy);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/ISweepLockInfo.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/ISweepLockInfo.java
new file mode 100644
index 0000000..a90c74d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/ISweepLockInfo.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.page;
+
+/**
+ * Sweep lock information holder of {@link CloudCachedPage}
+ */
+public interface ISweepLockInfo {
+    /**
+     * @return true if {@link CloudCachedPage} is locked for a sweep operation
+     */
+    boolean isLocked();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/UnlockedSweepLockInfo.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/UnlockedSweepLockInfo.java
new file mode 100644
index 0000000..e7c5f57
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/UnlockedSweepLockInfo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.page;
+
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+/**
+ * The default information holder when unlocked
+ */
+@ThreadSafe
+final class UnlockedSweepLockInfo implements ISweepLockInfo {
+    public static final ISweepLockInfo INSTANCE = new UnlockedSweepLockInfo();
+
+    @Override
+    public boolean isLocked() {
+        return false;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
new file mode 100644
index 0000000..381d5eb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.service;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.prefetch.AbstractPrefetchRequest;
+import org.apache.hyracks.storage.common.disk.prefetch.IPrefetchHandler;
+
+public final class CloudDiskCacheMonitoringAndPrefetchingService
+        implements IDiskCacheMonitoringService, IPrefetchHandler {
+    private final ExecutorService executor;
+    private final DiskCacheSweeperThread monitorThread;
+    private final IPhysicalDrive drive;
+
+    public CloudDiskCacheMonitoringAndPrefetchingService(ExecutorService executor, IPhysicalDrive drive,
+            DiskCacheSweeperThread monitorThread) {
+        this.executor = executor;
+        this.drive = drive;
+        this.monitorThread = monitorThread;
+    }
+
+    @Override
+    public void start() {
+        executor.execute(monitorThread);
+    }
+
+    @Override
+    public void stop() {
+        executor.shutdown();
+    }
+
+    @Override
+    public boolean isEnabled() {
+        return true;
+    }
+
+    @Override
+    public IPhysicalDrive getPhysicalDrive() {
+        return drive;
+    }
+
+    @Override
+    public void request(AbstractPrefetchRequest request) throws HyracksDataException {
+        // TODO implement
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
new file mode 100644
index 0000000..004c5f5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.service;
+
+import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+// TODO locking should be revised
+public final class CloudDiskResourceCacheLockNotifier implements IDiskResourceCacheLockNotifier {
+    private final Int2ObjectMap<DatasetUnit> datasets;
+
+    public CloudDiskResourceCacheLockNotifier() {
+        datasets = Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>());
+    }
+
+    @Override
+    public void onRegister(int datasetId, LocalResource localResource, IIndex index) {
+        ILSMIndex lsmIndex = (ILSMIndex) index;
+        if (lsmIndex.getDiskCacheManager().isSweepable()) {
+            DatasetUnit datasetUnit = datasets.computeIfAbsent(datasetId, DatasetUnit::new);
+            datasetUnit.addIndex(localResource.getId(), lsmIndex);
+        }
+    }
+
+    @Override
+    public void onUnregister(int datasetId, long resourceId) {
+        DatasetUnit datasetUnit = datasets.get(datasetId);
+        if (datasetUnit != null && datasetUnit.dropIndex(resourceId)) {
+            datasets.remove(datasetId);
+        }
+    }
+
+    @Override
+    public void onOpen(int datasetId, long resourceId) {
+        DatasetUnit datasetUnit = datasets.get(datasetId);
+        if (datasetUnit != null) {
+            IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
+            if (indexUnit != null) {
+                indexUnit.readLock();
+            }
+        }
+    }
+
+    @Override
+    public void onClose(int datasetId, long resourceId) {
+        DatasetUnit datasetUnit = datasets.get(datasetId);
+        if (datasetUnit != null) {
+            IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
+            if (indexUnit != null) {
+                indexUnit.readUnlock();
+            }
+        }
+    }
+
+    Int2ObjectMap<DatasetUnit> getDatasets() {
+        return datasets;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
new file mode 100644
index 0000000..79b978e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
+import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.sweeper.ISweeper;
+import org.apache.hyracks.cloud.sweeper.Sweeper;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.CriticalPath;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class DiskCacheSweeperThread implements Runnable, IDiskSpaceMaker {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final long waitTime;
+    private final CloudDiskResourceCacheLockNotifier resourceManager;
+    private final IPhysicalDrive physicalDrive;
+    private final List<IndexUnit> indexes;
+    private final ISweeper sweeper;
+
+    public DiskCacheSweeperThread(ExecutorService executorService, long waitTime,
+            CloudDiskResourceCacheLockNotifier resourceManager, ICloudIOManager cloudIOManager, int numOfSweepThreads,
+            int sweepQueueSize, IPhysicalDrive physicalDrive, BufferCache bufferCache,
+            Map<Integer, BufferedFileHandle> fileInfoMap) {
+        this.waitTime = TimeUnit.SECONDS.toMillis(waitTime);
+        this.resourceManager = resourceManager;
+        this.physicalDrive = physicalDrive;
+        indexes = new ArrayList<>();
+        sweeper = new Sweeper(executorService, cloudIOManager, bufferCache, fileInfoMap, numOfSweepThreads,
+                sweepQueueSize);
+    }
+
+    @Override
+    public void makeSpaceOrThrow(IOException ioException) throws HyracksDataException {
+        if (ioException.getMessage().contains("no space")) {
+            synchronized (this) {
+                // Notify the sweeper thread
+                notify();
+                try {
+                    // Wait for the sweep to finish
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        } else {
+            throw HyracksDataException.create(ioException);
+        }
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName(this.getClass().getSimpleName());
+        while (true) {
+            synchronized (this) {
+                try {
+                    sweep();
+                    wait(waitTime);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    notifyAll();
+                }
+            }
+        }
+    }
+
+    private void sweep() {
+        if (physicalDrive.computeAndCheckIsPressured()) {
+            for (DatasetUnit dataset : resourceManager.getDatasets().values()) {
+                indexes.clear();
+                dataset.getIndexes(indexes);
+                sweepIndexes(sweeper, indexes);
+            }
+        }
+    }
+
+    @CriticalPath
+    private static void sweepIndexes(ISweeper sweeper, List<IndexUnit> indexes) {
+        for (int i = 0; i < indexes.size(); i++) {
+            IndexUnit index = indexes.get(i);
+            if (!index.isSweeping()) {
+                try {
+                    sweeper.sweep(index);
+                } catch (InterruptedException e) {
+                    LOGGER.warn("Sweeping thread interrupted", e);
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
new file mode 100644
index 0000000..34b37fb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.unit;
+
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+
+public final class DatasetUnit {
+    private final int id;
+    private final ReentrantReadWriteLock lock;
+    /**
+     * Maps resourceId to {@link IndexUnit}
+     */
+    private final Long2ObjectMap<IndexUnit> indexes;
+
+    public DatasetUnit(int datasetId) {
+        id = datasetId;
+        lock = new ReentrantReadWriteLock();
+        indexes = new Long2ObjectOpenHashMap<>();
+
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public IndexUnit addIndex(long resourceId, ILSMIndex index) {
+        writeLock();
+        try {
+            IndexUnit indexUnit = new IndexUnit(resourceId, index);
+            indexes.put(resourceId, indexUnit);
+            return indexUnit;
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public boolean dropIndex(long resourceId) {
+        IndexUnit indexUnit = indexes.remove(resourceId);
+        // Signal that the index is being dropped so a sweeper thread does not sweep this index or stops sweeping
+        indexUnit.setDropped();
+        // Wait for the sweep operation (if running) before allowing the index to be dropped
+        indexUnit.waitForSweep();
+        return indexUnit.getIndex().isPrimaryIndex();
+    }
+
+    public IndexUnit getIndex(long resourceId) {
+        readLock();
+        try {
+            return indexes.get(resourceId);
+        } finally {
+            readUnlock();
+        }
+    }
+
+    /**
+     * Return the current indexes
+     *
+     * @param indexUnits container used to return the current indexes
+     */
+    public void getIndexes(List<IndexUnit> indexUnits) {
+        readLock();
+        try {
+            indexUnits.addAll(indexes.values());
+        } finally {
+            readUnlock();
+        }
+    }
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
new file mode 100644
index 0000000..8f8b412
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.unit;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+// TODO allow evicting an index entirely
+public final class IndexUnit {
+    private final long id;
+    private final ILSMIndex index;
+    private final AtomicBoolean dropped;
+    private final AtomicBoolean sweeping;
+    private final AtomicInteger readCounter;
+
+    public IndexUnit(long resourceId, ILSMIndex index) {
+        this.id = resourceId;
+        this.index = index;
+        dropped = new AtomicBoolean(false);
+        sweeping = new AtomicBoolean(false);
+        readCounter = new AtomicInteger(0);
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public ILSMIndex getIndex() {
+        return index;
+    }
+
+    public void setDropped() {
+        dropped.set(false);
+    }
+
+    public boolean isDropped() {
+        return dropped.get();
+    }
+
+    public boolean isSweeping() {
+        return sweeping.get();
+    }
+
+    public void startSweeping() {
+        sweeping.set(true);
+    }
+
+    public void waitForSweep() {
+        synchronized (sweeping) {
+            while (sweeping.get()) {
+                // This should not be interrupted until we get a notification the sweep is done
+                InvokeUtil.doUninterruptibly(sweeping::wait);
+            }
+        }
+    }
+
+    public void finishedSweeping() {
+        sweeping.set(false);
+        synchronized (sweeping) {
+            sweeping.notifyAll();
+        }
+    }
+
+    public void readLock() {
+        readCounter.incrementAndGet();
+    }
+
+    public void readUnlock() {
+        readCounter.decrementAndGet();
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/DefaultFileSystemOperator.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/DefaultFileSystemOperator.java
new file mode 100644
index 0000000..e389bf0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/DefaultFileSystemOperator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.nio.channels.FileChannel;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+class DefaultFileSystemOperator implements IFileSystemOperator {
+    @Override
+    public int getFileDescriptor(FileChannel fileChannel) {
+        return -1;
+    }
+
+    @Override
+    public int getBlockSize(int fileDescriptor) throws HyracksDataException {
+        return -1;
+    }
+
+    @Override
+    public long punchHole(int fileDescriptor, long offset, long length, int blockSize) {
+        return 0;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/FileSystemOperationDispatcherUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/FileSystemOperationDispatcherUtil.java
new file mode 100644
index 0000000..6708644
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/FileSystemOperationDispatcherUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.nio.channels.FileChannel;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FileSystemOperationDispatcherUtil {
+    private static final IFileSystemOperator FILE_SYSTEM_OPERATOR = createFileSystemOperator();
+
+    public static int getFileDescriptor(FileChannel fileChannel) throws HyracksDataException {
+        return FILE_SYSTEM_OPERATOR.getFileDescriptor(fileChannel);
+    }
+
+    public static int getBlockSize(int fileDescriptor) throws HyracksDataException {
+        return FILE_SYSTEM_OPERATOR.getBlockSize(fileDescriptor);
+    }
+
+    public static int punchHole(int fileDescriptor, long offset, long length, int blockSize)
+            throws HyracksDataException {
+        return (int) FILE_SYSTEM_OPERATOR.punchHole(fileDescriptor, offset, length, blockSize);
+    }
+
+    private static IFileSystemOperator createFileSystemOperator() {
+        if (isLinux()) {
+            return new LinuxFileSystemOperator();
+        }
+
+        return new DefaultFileSystemOperator();
+    }
+
+    public static String getOSName() {
+        return System.getProperty("os.name");
+    }
+
+    public static boolean isLinux() {
+        String os = getOSName();
+        return os.contains("linux");
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/IFileSystemOperator.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/IFileSystemOperator.java
new file mode 100644
index 0000000..3b6200e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/IFileSystemOperator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.nio.channels.FileChannel;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An interface for native file system operations
+ */
+interface IFileSystemOperator {
+    /**
+     * Returns the file descriptor of a file
+     *
+     * @param fileChannel opened file channel
+     * @return file descriptor as reported by the OS
+     */
+    int getFileDescriptor(FileChannel fileChannel) throws HyracksDataException;
+
+    /**
+     * Block size of a file (OS dependent)
+     *
+     * @param fileDescriptor of the file
+     * @return block size
+     */
+    int getBlockSize(int fileDescriptor) throws HyracksDataException;
+
+    /**
+     * Punches a hole in a file
+     *
+     * @param fileDescriptor of the file
+     * @param offset         starting offset
+     * @param length         length
+     * @param blockSize      block size
+     * @return length of the hole
+     */
+    long punchHole(int fileDescriptor, long offset, long length, int blockSize) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/LinuxFileSystemOperator.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/LinuxFileSystemOperator.java
new file mode 100644
index 0000000..c8e7ee9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/LinuxFileSystemOperator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.io.FileDescriptor;
+import java.lang.reflect.Field;
+import java.nio.channels.FileChannel;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import jnr.ffi.LibraryLoader;
+import jnr.posix.FileStat;
+import jnr.posix.POSIX;
+import jnr.posix.POSIXFactory;
+
+class LinuxFileSystemOperator implements IFileSystemOperator {
+    /**
+     * Load native library
+     */
+    private static final LinuxNativeLibC libc = LibraryLoader.create(LinuxNativeLibC.class).failImmediately().load("c");
+    /**
+     * Load POSIX
+     */
+    private static final POSIX posix = POSIXFactory.getPOSIX();
+
+    /**
+     * default is extend size
+     */
+    public static final int FALLOC_FL_KEEP_SIZE = 0x01;
+
+    /**
+     * de-allocates range
+     */
+    public static final int FALLOC_FL_PUNCH_HOLE = 0x02;
+
+    @Override
+    public int getFileDescriptor(FileChannel fileChannel) throws HyracksDataException {
+        FileDescriptor fd = getField(fileChannel, "fd", FileDescriptor.class);
+        return getField(fd, "fd", int.class);
+    }
+
+    @Override
+    public int getBlockSize(int fileDescriptor) throws HyracksDataException {
+        FileStat stat = posix.fstat(fileDescriptor);
+        return Math.toIntExact(stat.blockSize());
+    }
+
+    /**
+     * @param fileDescriptor of the file
+     * @param offset         starting offset
+     * @param length         length
+     * @param blockSize      block size
+     * @return length of the hole
+     * @see <a href="https://github.com/apache/ignite/tree/master/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress">Apache Ignite Internal</a>
+     */
+    @Override
+    public long punchHole(int fileDescriptor, long offset, long length, int blockSize) throws HyracksDataException {
+        assert offset >= 0;
+        assert length > 0;
+
+        if (length < blockSize) {
+            return 0;
+        }
+
+        long off = offset;
+        long len = length;
+        // TODO maybe optimize for power of 2
+        if (off % blockSize != 0) {
+            long end = off + len;
+            off = (off / blockSize + 1) * blockSize;
+            len = end - off;
+
+            if (len <= 0) {
+                return 0;
+            }
+        }
+
+        len = len / blockSize * blockSize;
+
+        if (len > 0) {
+            int res = libc.fallocate(fileDescriptor, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, off, len);
+            if (res != 0) {
+                throw new HyracksDataException("error");
+            }
+        }
+
+        return len;
+    }
+
+    private <T> T getField(Object object, String name, Class<T> clazz) throws HyracksDataException {
+        try {
+            Field field = object.getClass().getDeclaredField(name);
+            field.setAccessible(true);
+            return clazz.cast(field.get(field));
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public interface LinuxNativeLibC {
+        /**
+         * Allows the caller to directly manipulate the allocated
+         * disk space for the file referred to by fd for the byte range starting
+         * at {@code off} offset and continuing for {@code len} bytes.
+         *
+         * @param fd   file descriptor.
+         * @param mode determines the operation to be performed on the given range.
+         * @param off  required position offset.
+         * @param len  required length.
+         * @return On success, fallocate() returns zero.  On error, -1 is returned and
+         * {@code errno} is set to indicate the error.
+         */
+        int fallocate(int fd, int mode, long off, long len);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
new file mode 100644
index 0000000..72996b6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@ThreadSafe
+public final class PhysicalDrive implements IPhysicalDrive {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final List<File> drivePaths;
+    private final long pressureSize;
+    private final AtomicBoolean pressured;
+
+    public PhysicalDrive(List<IODeviceHandle> deviceHandles, double pressureThreshold, double storagePercentage,
+            long pressureDebugSize) {
+        drivePaths = getDrivePaths(deviceHandles);
+        pressureSize = getPressureSize(drivePaths, pressureThreshold, storagePercentage, pressureDebugSize);
+        pressured = new AtomicBoolean(getUsedSpace() <= pressureSize);
+    }
+
+    @Override
+    public boolean computeAndCheckIsPressured() {
+        long usedSpace = getUsedSpace();
+        boolean isPressured = usedSpace > pressureSize;
+        pressured.set(isPressured);
+        LOGGER.info("Used space: {}, pressureCapacity: {} (isPressured: {})",
+                StorageUtil.toHumanReadableSize(usedSpace), StorageUtil.toHumanReadableSize(pressureSize), isPressured);
+        return isPressured;
+    }
+
+    @Override
+    public boolean hasSpace() {
+        return !pressured.get();
+    }
+
+    private long getUsedSpace() {
+        long totalUsedSpace = 0;
+        for (int i = 0; i < drivePaths.size(); i++) {
+            File device = drivePaths.get(i);
+            totalUsedSpace += device.getTotalSpace() - device.getFreeSpace();
+        }
+        return totalUsedSpace;
+    }
+
+    private static long getPressureSize(List<File> drivePaths, double pressureThreshold, double storagePercentage,
+            long pressureDebugSize) {
+
+        long totalCapacity = 0;
+        long totalUsedSpace = 0;
+        for (File drive : drivePaths) {
+            totalCapacity += drive.getTotalSpace();
+            totalUsedSpace += drive.getTotalSpace() - drive.getFreeSpace();
+        }
+
+        long allocatedCapacity = (long) (totalCapacity * storagePercentage);
+        long pressureCapacity = pressureDebugSize > 0 ? totalUsedSpace + pressureDebugSize
+                : (long) (allocatedCapacity * pressureThreshold);
+
+        LOGGER.info(
+                "PhysicalDrive configured with diskCapacity: {}, allocatedCapacity: {}, and pressureCapacity: {} (used space: {})",
+                StorageUtil.toHumanReadableSize(totalCapacity), StorageUtil.toHumanReadableSize(allocatedCapacity),
+                StorageUtil.toHumanReadableSize(pressureCapacity), StorageUtil.toHumanReadableSize(totalUsedSpace));
+
+        return pressureCapacity;
+    }
+
+    private static List<File> getDrivePaths(List<IODeviceHandle> deviceHandles) {
+        File[] roots = File.listRoots();
+        Set<File> distinctUsedRoots = new HashSet<>();
+        for (IODeviceHandle handle : deviceHandles) {
+            File handlePath = handle.getMount();
+            for (File root : roots) {
+                if (handlePath.getAbsolutePath().startsWith(root.getAbsolutePath())
+                        && !distinctUsedRoots.contains(root)) {
+                    distinctUsedRoots.add(root);
+                    break;
+                }
+            }
+        }
+        return new ArrayList<>(distinctUsedRoots);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
new file mode 100644
index 0000000..430aac7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+
+/**
+ * Certain operations needed to be provided by {@link org.apache.hyracks.api.io.IIOManager} to support cloud
+ * file operations in a cloud deployment.
+ */
+public interface ICloudIOManager {
+    /**
+     * Read from the cloud
+     *
+     * @param fHandle file handle
+     * @param offset  starting offset
+     * @param data    buffer to read to
+     */
+    void cloudRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+
+    /**
+     * Read from the cloud
+     *
+     * @param fHandle file handle
+     * @param offset  starting offset
+     * @return input stream of the required data
+     */
+    InputStream cloudRead(IFileHandle fHandle, long offset, long length);
+
+    /**
+     * Write to local drive only
+     *
+     * @param fHandle file handle
+     * @param offset  starting offset
+     * @param data    to write
+     */
+
+    int localWriter(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+
+    /**
+     * Write to cloud only
+     *
+     * @param fHandle file handle
+     * @param offset  starting offset
+     * @param data    to write
+     * @return number of written bytes
+     */
+    int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+
+    /**
+     * Write to cloud only
+     *
+     * @param fHandle file handle
+     * @param offset  starting offset
+     * @param data    to write
+     * @return number of written bytes
+     */
+    long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) throws HyracksDataException;
+
+    /**
+     * Punch a hole in a file
+     *
+     * @param fHandle file handle
+     * @param offset  starting offset
+     * @param length  length
+     */
+    int punchHole(IFileHandle fHandle, long offset, long length) throws HyracksDataException;
+
+    /**
+     * Evict a directory from the local disk cache
+     *
+     * @param directory to evict
+     */
+    void evict(FileReference directory) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
new file mode 100644
index 0000000..d064fda
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.sweeper;
+
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+
+/**
+ * Sweeps an index to relieve the pressure on a local {@link IPhysicalDrive}
+ */
+@FunctionalInterface
+public interface ISweeper {
+    /**
+     * Sweep an index
+     *
+     * @param indexUnit to sweep
+     */
+    void sweep(IndexUnit indexUnit) throws InterruptedException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
new file mode 100644
index 0000000..0410e3b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.sweeper;
+
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+
+public final class NoOpSweeper implements ISweeper {
+    public static final ISweeper INSTANCE = new NoOpSweeper();
+
+    private NoOpSweeper() {
+    }
+
+    @Override
+    public void sweep(IndexUnit indexUnit) {
+        // NoOp
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
new file mode 100644
index 0000000..773b307
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.sweeper;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public final class SweepContext implements ISweepContext {
+    private final ICloudIOManager cloudIOManager;
+    private final BufferCache bufferCache;
+    private final Map<Integer, BufferedFileHandle> fileInfoMap;
+    private final AtomicBoolean shutdown;
+    private IndexUnit indexUnit;
+    private BufferedFileHandle handle;
+
+    public SweepContext(ICloudIOManager cloudIOManager, BufferCache bufferCache,
+            Map<Integer, BufferedFileHandle> fileInfoMap, AtomicBoolean shutdown) {
+        this.cloudIOManager = cloudIOManager;
+        this.bufferCache = bufferCache;
+        this.fileInfoMap = fileInfoMap;
+        this.shutdown = shutdown;
+    }
+
+    @Override
+    public void open(int fileId) throws HyracksDataException {
+        close();
+        bufferCache.openFile(fileId);
+        synchronized (fileInfoMap) {
+            this.handle = fileInfoMap.get(fileId);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (handle != null) {
+            bufferCache.closeFile(handle.getFileId());
+            handle = null;
+        }
+    }
+
+    @Override
+    public CloudCachedPage pin(long dpid, IBufferCacheReadContext bcOpCtx) throws HyracksDataException {
+        return (CloudCachedPage) bufferCache.pin(dpid, bcOpCtx);
+    }
+
+    @Override
+    public void unpin(ICachedPage page, IBufferCacheReadContext bcOpCtx) throws HyracksDataException {
+        bufferCache.unpin(page, bcOpCtx);
+    }
+
+    public void setIndexUnit(IndexUnit indexUnit) {
+        this.indexUnit = indexUnit;
+    }
+
+    public IndexUnit getIndexUnit() {
+        return indexUnit;
+    }
+
+    public int punchHole(int startPageId, int numberOfPage) throws HyracksDataException {
+        long offset = handle.getStartPageOffset(startPageId);
+        long length = handle.getPagesTotalSize(startPageId, numberOfPage);
+        return cloudIOManager.punchHole(handle.getFileHandle(), offset, length);
+    }
+
+    /**
+     * Whether the sweep operation should stop or proceed
+     * Stopping condition:
+     * 1- Either the sweeper thread is shutting down
+     * 2- OR the index was dropped
+     *
+     * @return true if it should stop, false otherwise
+     */
+    public boolean stopSweeping() {
+        return shutdown.get() || indexUnit.isDropped();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
new file mode 100644
index 0000000..36c4a12
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.sweeper;
+
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class Sweeper implements ISweeper {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final SweepRequest POISON = new SweepRequest();
+    private final BlockingQueue<SweepRequest> requests;
+    private final BlockingQueue<SweepRequest> freeRequests;
+    private final AtomicBoolean shutdown;
+
+    public Sweeper(ExecutorService executor, ICloudIOManager cloudIOManager, BufferCache bufferCache,
+            Map<Integer, BufferedFileHandle> fileInfoMap, int numOfSweepThreads, int queueSize) {
+        requests = new ArrayBlockingQueue<>(queueSize);
+        freeRequests = new ArrayBlockingQueue<>(queueSize);
+        shutdown = new AtomicBoolean(false);
+        for (int i = 0; i < queueSize; i++) {
+            freeRequests.add(new SweepRequest(new SweepContext(cloudIOManager, bufferCache, fileInfoMap, shutdown)));
+        }
+        for (int i = 0; i < numOfSweepThreads; i++) {
+            executor.execute(new SweepThread(requests, freeRequests, i));
+        }
+    }
+
+    @Override
+    public void sweep(IndexUnit indexUnit) throws InterruptedException {
+        SweepRequest request = freeRequests.take();
+        request.reset(indexUnit);
+        requests.put(request);
+    }
+
+    public void shutdown() {
+        shutdown.set(true);
+        requests.clear();
+        freeRequests.clear();
+        requests.offer(POISON);
+        // TODO wait for threads to terminate
+    }
+
+    private static class SweepThread implements Runnable {
+        private final BlockingQueue<SweepRequest> requests;
+        private final BlockingQueue<SweepRequest> freeRequests;
+        private final int threadNumber;
+
+        private SweepThread(BlockingQueue<SweepRequest> requests, BlockingQueue<SweepRequest> freeRequests,
+                int threadNumber) {
+            this.requests = requests;
+            this.freeRequests = freeRequests;
+            this.threadNumber = threadNumber;
+        }
+
+        @Override
+        public void run() {
+            Thread.currentThread().setName(getClass().getSimpleName() + "-" + threadNumber);
+            while (true) {
+                SweepRequest request = null;
+                try {
+                    request = requests.take();
+                    if (isPoison(request)) {
+                        break;
+                    }
+                    request.handle();
+                } catch (InterruptedException e) {
+                    LOGGER.warn("Ignoring interrupt. Sweep threads should never be interrupted.");
+                } catch (Throwable t) {
+                    LOGGER.error("Sweep failed", t);
+                } finally {
+                    if (request != null && request != POISON) {
+                        freeRequests.add(request);
+                    }
+                }
+
+            }
+        }
+
+        private boolean isPoison(SweepRequest request) {
+            if (request == POISON) {
+                LOGGER.info("Exiting");
+                InvokeUtil.doUninterruptibly(() -> requests.put(POISON));
+                if (Thread.interrupted()) {
+                    LOGGER.error("Ignoring interrupt. Sweep threads should never be interrupted.");
+                }
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+    private static class SweepRequest {
+        private final SweepContext context;
+
+        SweepRequest() {
+            this(null);
+        }
+
+        SweepRequest(SweepContext context) {
+            this.context = context;
+        }
+
+        void reset(IndexUnit indexUnit) {
+            context.setIndexUnit(indexUnit);
+        }
+
+        void handle() throws HyracksDataException {
+            if (context.stopSweeping()) {
+                /*
+                 * This could happen as the sweeper gets a copy of a list of all indexUnits at a certain point
+                 * of time. However, after acquiring the list and the start of handling this sweep request, we found
+                 * that the index of this request was dropped.
+                 *
+                 * To illustrate:
+                 * 1- The Sweeper got a list of all indexUnits (say index_1 and index_2)
+                 * 2a- The Sweeper started sweeping index_1
+                 * 2b- index_2 was dropped
+                 * 3- The sweeper finished sweeping index_1 and started sweeping index_2. However, index_2 was
+                 * dropped at 2b.
+                 */
+                return;
+            }
+            IndexUnit indexUnit = context.getIndexUnit();
+            indexUnit.startSweeping();
+            try {
+                ILSMIndex index = indexUnit.getIndex();
+                IIndexDiskCacheManager diskCacheManager = index.getDiskCacheManager();
+                if (!diskCacheManager.isActive()) {
+                    return;
+                }
+
+                if (diskCacheManager.isSweepRequirePlanning()) {
+                    // Manager require planning
+                    diskCacheManager.prepareSweepPlan();
+                }
+                // Currently, we always sweep.
+                // But we in the future we can only do a planning and sweep with another request
+                diskCacheManager.sweep(context);
+            } finally {
+                indexUnit.finishedSweeping();
+            }
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/test/java/org/apache/hyracks/cloud/filesystem/PhysicalDriveTest.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/test/java/org/apache/hyracks/cloud/filesystem/PhysicalDriveTest.java
new file mode 100644
index 0000000..9e9c782
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/test/java/org/apache/hyracks/cloud/filesystem/PhysicalDriveTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.io.File;
+
+import javax.swing.filechooser.FileSystemView;
+
+import org.junit.Test;
+
+public class PhysicalDriveTest {
+
+    @Test
+    public void test() {
+        FileSystemView fileSystemView = FileSystemView.getFileSystemView();
+        File[] roots = File.listRoots();
+        File file = new File(".");
+
+        for (File root : roots) {
+            if (file.getAbsolutePath().startsWith(root.getAbsolutePath())) {
+                System.err.println(root.getAbsolutePath() + " is drive: " + fileSystemView.getSystemDisplayName(root));
+            }
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index 69fc6d1..bdac239 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
 import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
 import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
@@ -44,7 +45,9 @@
 import org.apache.hyracks.storage.common.file.ResourceIdFactory;
 import org.apache.hyracks.storage.common.file.ResourceIdFactoryProvider;
 import org.apache.hyracks.storage.common.file.TransientLocalResourceRepositoryFactory;
+import org.apache.hyracks.util.annotations.TestOnly;
 
+@TestOnly
 public class RuntimeContext {
     private final IIOManager ioManager;
     private final IBufferCache bufferCache;
@@ -56,7 +59,8 @@
     public RuntimeContext(INCServiceContext appCtx) throws HyracksDataException {
         fileMapManager = new FileMapManager();
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
-        IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, 32768, 50);
+        IPageReplacementStrategy prs =
+                new ClockPageReplacementStrategy(allocator, DefaultDiskCachedPageAllocator.INSTANCE, 32768, 50);
         ThreadFactory threadFactory = Thread::new;
         this.ioManager = appCtx.getIoManager();
         bufferCache = new BufferCache(ioManager, prs, new DelayPageCleanerPolicy(1000), fileMapManager, 100, 10,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index 932ad6c..ddd8dfd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -32,10 +32,12 @@
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.TestOnly;
 
 /**
  * @deprecated This class must not be used. Instead, use {@link AppendOnlyLinkedMetadataPageManager}
  */
+@TestOnly
 @Deprecated
 public class LinkedMetaDataPageManager implements IMetadataPageManager {
     private final IBufferCache bufferCache;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 4756921..0869fed 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMHarness;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -99,12 +100,8 @@
      * Create a flush operation.
      * This is an atomic operation. If an exception is thrown, no partial effect is left
      *
+     * @param ctx the operation context
      * @return the flush operation
-     *
-     * @param ctx
-     *            the operation context
-     * @param callback
-     *            the IO callback
      * @throws HyracksDataException
      */
     ILSMIOOperation createFlushOperation(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -115,10 +112,7 @@
      * Create a merge operation.
      * This is an atomic operation. If an exception is thrown, no partial effect is left
      *
-     * @param ctx
-     *            the operation context
-     * @param callback
-     *            the IO callback
+     * @param ctx the operation context
      * @throws HyracksDataException
      */
     ILSMIOOperation createMergeOperation(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -141,8 +135,7 @@
     /**
      * Populates the context's component holder with a snapshot of the components involved in the operation.
      *
-     * @param ctx
-     *            - the operation's context
+     * @param ctx - the operation's context
      * @throws HyracksDataException
      */
     void getOperationalComponents(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -231,4 +224,9 @@
      */
     void resetCurrentComponentIndex();
 
+    /**
+     * @return disk cache manager
+     */
+    IIndexDiskCacheManager getDiskCacheManager();
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/DefaultIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/DefaultIndexDiskCacheManager.java
new file mode 100644
index 0000000..fd0c983
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/DefaultIndexDiskCacheManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.cloud;
+
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+
+public final class DefaultIndexDiskCacheManager implements IIndexDiskCacheManager {
+    public static final IIndexDiskCacheManager INSTANCE = new DefaultIndexDiskCacheManager();
+    private static final String NOT_SWEEPABLE_ERR_MSG = "Index is not sweepable";
+
+    @Override
+    public boolean isActive() {
+        return false;
+    }
+
+    @Override
+    public boolean isSweepable() {
+        return false;
+    }
+
+    @Override
+    public boolean isSweepRequirePlanning() {
+        throw new IllegalStateException(NOT_SWEEPABLE_ERR_MSG);
+    }
+
+    @Override
+    public void prepareSweepPlan() {
+        throw new IllegalStateException(NOT_SWEEPABLE_ERR_MSG);
+    }
+
+    @Override
+    public long sweep(ISweepContext context) {
+        throw new IllegalStateException(NOT_SWEEPABLE_ERR_MSG);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/IIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/IIndexDiskCacheManager.java
new file mode 100644
index 0000000..ee393af
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/IIndexDiskCacheManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.cloud;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+
+/**
+ * Disk cache manager for an index
+ */
+public interface IIndexDiskCacheManager {
+    /**
+     * @return the manager is active
+     * @see ILSMIndex#activate()
+     */
+    boolean isActive();
+
+    /**
+     * @return whether an index can be swept to make space in {@link IPhysicalDrive}
+     */
+    boolean isSweepable();
+
+    /**
+     * @return whether a sweep operation requires planning
+     */
+    boolean isSweepRequirePlanning();
+
+    /**
+     * Prepare a sweep plan
+     */
+    void prepareSweepPlan();
+
+    /**
+     * Sweep an index to make space in {@link IPhysicalDrive}
+     *
+     * @param context sweep context
+     * @return freed space in bytes
+     */
+    long sweep(ISweepContext context) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index dfce00f..0534754 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -68,6 +68,8 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.cloud.DefaultIndexDiskCacheManager;
+import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.IIndexCursor;
@@ -957,4 +959,8 @@
         return mergePolicy;
     }
 
+    @Override
+    public IIndexDiskCacheManager getDiskCacheManager() {
+        return DefaultIndexDiskCacheManager.INSTANCE;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 0c442a7..a559a8c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -297,13 +297,20 @@
 
     protected FileReference getCompressedFileReferenceIfAny(String name) {
         final ICompressorDecompressor compDecomp = compressorDecompressorFactory.createInstance();
+        FileReference treeFileRef;
         //Avoid creating LAF file for NoOpCompressorDecompressor
         if (compDecomp != NoOpCompressorDecompressor.INSTANCE && isCompressible(name)) {
             final String path = baseDir.getChildPath(name);
-            return new CompressedFileReference(baseDir.getDeviceHandle(), compDecomp, path, path + LAF_SUFFIX);
+            treeFileRef = new CompressedFileReference(baseDir.getDeviceHandle(), compDecomp, path, path + LAF_SUFFIX);
+        } else {
+            treeFileRef = baseDir.getChild(name);
         }
 
-        return baseDir.getChild(name);
+        if (areHolesAllowed()) {
+            treeFileRef.setHolesAllowed();
+        }
+
+        return treeFileRef;
     }
 
     protected void cleanLookAsideFiles(Set<String> groundTruth, IBufferCache bufferCache) throws HyracksDataException {
@@ -321,6 +328,10 @@
         }
     }
 
+    protected boolean areHolesAllowed() {
+        return false;
+    }
+
     private boolean isCompressible(String fileName) {
         return !fileName.endsWith(BLOOM_FILTER_SUFFIX) && !fileName.endsWith(DELETE_TREE_SUFFIX);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
index 1bceea3..1091040 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -27,6 +27,8 @@
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheWriteContext;
 import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
 import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
 import org.apache.hyracks.util.annotations.NotThreadSafe;
@@ -65,42 +67,37 @@
     /**
      * Read the CachedPage from disk
      *
-     * @param cPage
-     *            CachedPage in {@link BufferCache}
-     * @throws HyracksDataException
+     * @param cPage   CachedPage in {@link BufferCache}
+     * @param context read context
      */
-    public abstract void read(CachedPage cPage) throws HyracksDataException;
+    public abstract void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException;
 
     /**
      * Write the CachedPage into disk
      *
-     * @param cPage
-     *            CachedPage in {@link BufferCache}
-     * @throws HyracksDataException
+     * @param cPage   CachedPage in {@link BufferCache}
+     * @param context write context
      */
-    public void write(CachedPage cPage) throws HyracksDataException {
+    public void write(CachedPage cPage, IBufferCacheWriteContext context) throws HyracksDataException {
         final int totalPages = cPage.getFrameSizeMultiplier();
         final int extraBlockPageId = cPage.getExtraBlockPageId();
         final BufferCacheHeaderHelper header = checkoutHeaderHelper();
-        write(cPage, header, totalPages, extraBlockPageId);
+        write(cPage, header, totalPages, extraBlockPageId, context);
     }
 
     /**
-     * Write the CachedPage into disk called by {@link AbstractBufferedFileIOManager#write(CachedPage)}
+     * Write the CachedPage into disk called by
+     * {@link AbstractBufferedFileIOManager#write(CachedPage, IBufferCacheWriteContext)}
      * Note: It is the responsibility of the caller to return {@link BufferCacheHeaderHelper}
      *
-     * @param cPage
-     *            CachedPage that will be written
-     * @param header
-     *            HeaderHelper to add into the written page
-     * @param totalPages
-     *            Number of pages to be written
-     * @param extraBlockPageId
-     *            Extra page ID in case it has more than one page
-     * @throws HyracksDataException
+     * @param cPage            CachedPage that will be written
+     * @param header           HeaderHelper to add into the written page
+     * @param totalPages       Number of pages to be written
+     * @param extraBlockPageId Extra page ID in case it has more than one page
+     * @param context          write context
      */
     protected abstract void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages,
-            int extraBlockPageId) throws HyracksDataException;
+            int extraBlockPageId, IBufferCacheWriteContext context) throws HyracksDataException;
 
     /* ********************************
      * File operations' methods
@@ -133,11 +130,14 @@
         ioManager.close(fileHandle);
     }
 
+    public IFileHandle getFileHandle() {
+        return fileHandle;
+    }
+
     /**
      * Force the file into disk
      *
-     * @param metadata
-     *            see {@link java.nio.channels.FileChannel#force(boolean)}
+     * @param metadata see {@link java.nio.channels.FileChannel#force(boolean)}
      * @throws HyracksDataException
      */
     public void force(boolean metadata) throws HyracksDataException {
@@ -145,6 +145,14 @@
     }
 
     /**
+     * Return start offset of a page
+     *
+     * @param pageId page ID
+     * @return offset
+     */
+    public abstract long getStartPageOffset(int pageId) throws HyracksDataException;
+
+    /**
      * Get the number of pages in the file
      *
      * @throws HyracksDataException
@@ -158,8 +166,7 @@
     /**
      * Check whether the file has been deleted
      *
-     * @return
-     *         true if has been deleted, false o.w
+     * @return true if has been deleted, false o.w
      */
     public boolean hasBeenDeleted() {
         return fileHandle == null;
@@ -168,8 +175,7 @@
     /**
      * Check whether the file has ever been opened
      *
-     * @return
-     *         true if has ever been opened, false otherwise
+     * @return true if has ever been opened, false otherwise
      */
     public final boolean hasBeenOpened() {
         return hasOpen;
@@ -235,6 +241,15 @@
 
     public abstract ICompressedPageWriter getCompressedPageWriter();
 
+    /**
+     * Compute the total size of pages
+     *
+     * @param startPageId   page ID to start from
+     * @param numberOfPages the number of pages
+     * @return total size of pages in bytes
+     */
+    public abstract long getPagesTotalSize(int startPageId, int numberOfPages) throws HyracksDataException;
+
     /* ********************************
      * Common helper methods
      * ********************************
@@ -243,20 +258,16 @@
     /**
      * Get the offset for the first page
      *
-     * @param cPage
-     *            CachedPage for which the offset is needed
-     * @return
-     *         page offset in the file
+     * @param cPage CachedPage for which the offset is needed
+     * @return page offset in the file
      */
     protected abstract long getFirstPageOffset(CachedPage cPage);
 
     /**
      * Get the offset for the extra page
      *
-     * @param cPage
-     *            CachedPage for which the offset is needed
-     * @return
-     *         page offset in the file
+     * @param cPage CachedPage for which the offset is needed
+     * @return page offset in the file
      */
     protected abstract long getExtraPageOffset(CachedPage cPage);
 
@@ -276,11 +287,11 @@
         return ioManager.syncRead(fileHandle, offset, buf);
     }
 
-    protected final long writeToFile(ByteBuffer buf, long offset) throws HyracksDataException {
+    protected final long writeExtraToFile(ByteBuffer buf, long offset) throws HyracksDataException {
         return ioManager.doSyncWrite(fileHandle, offset, buf);
     }
 
-    protected final long writeToFile(ByteBuffer[] buf, long offset) throws HyracksDataException {
+    protected final long writeExtraToFile(ByteBuffer[] buf, long offset) throws HyracksDataException {
         return ioManager.doSyncWrite(fileHandle, offset, buf);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 1660b7e..5d25685 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -568,7 +568,7 @@
     private void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException {
         BufferedFileHandle fInfo = getFileHandle(cPage);
         cPage.buffer.clear();
-        fInfo.read(cPage);
+        fInfo.read(cPage, context);
         final IThreadStats threadStats = statsSubscribers.get(Thread.currentThread());
         if (threadStats != null && context.incrementStats()) {
             threadStats.coldRead();
@@ -588,7 +588,7 @@
             if (fInfo.hasBeenDeleted()) {
                 return;
             }
-            fInfo.write(cPage);
+            fInfo.write(cPage, context);
         }
 
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
index a105e60..8f9054a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
@@ -23,17 +23,19 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.control.nc.io.IOManager;
 
 public class BufferCacheHeaderHelper {
     public static final int FRAME_MULTIPLIER_OFF = 0;
     public static final int EXTRA_BLOCK_PAGE_ID_OFF = FRAME_MULTIPLIER_OFF + 4; // 4
 
     private final ByteBuffer[] array;
-    private final int pageSizeWithHeader;
     private ByteBuffer buf;
 
     public BufferCacheHeaderHelper(int pageSize) {
-        this.pageSizeWithHeader = RESERVED_HEADER_BYTES + pageSize;
+        int pageSizeWithHeader = RESERVED_HEADER_BYTES + pageSize;
         buf = ByteBuffer.allocate(pageSizeWithHeader);
         array = new ByteBuffer[] { buf, null };
     }
@@ -54,34 +56,47 @@
         return buf;
     }
 
+    public int writeToFile(IOManager ioManager, IFileHandle handle, ByteBuffer buffer, long offset)
+            throws HyracksDataException {
+        // TODO include CRC32?
+        return ioManager.doSyncWrite(handle, offset, buffer);
+    }
+
+    public long writeToFile(IOManager ioManager, IFileHandle handle, ByteBuffer[] buffers, long offset)
+            throws HyracksDataException {
+        // TODO include CRC32?
+        return ioManager.doSyncWrite(handle, offset, buffers);
+    }
+
+    public int readFromFile(IOManager ioManager, IFileHandle handle, long offset, int size)
+            throws HyracksDataException {
+        buf.position(0);
+        buf.limit(size);
+        // TODO check whether the CRC32 is valid or not
+        return ioManager.syncRead(handle, offset, buf);
+    }
+
     public ByteBuffer prepareRead(int size) {
         buf.position(0);
         buf.limit(size);
         return buf;
     }
 
-    public ByteBuffer processHeader(CachedPage cPage) {
-        cPage.setFrameSizeMultiplier(buf.getInt(FRAME_MULTIPLIER_OFF));
-        cPage.setExtraBlockPageId(buf.getInt(EXTRA_BLOCK_PAGE_ID_OFF));
-        buf.position(RESERVED_HEADER_BYTES);
-        return buf;
-    }
-
     public ByteBuffer getBuffer() {
         return buf;
     }
 
     private void setPageInfo(CachedPage cPage) {
         buf.putInt(FRAME_MULTIPLIER_OFF, cPage.getFrameSizeMultiplier());
+        // TODO EXTRA_BLOCK_PAGE_ID_OFF is always going to be the following page, use it for CRC32 instead?
         buf.putInt(EXTRA_BLOCK_PAGE_ID_OFF, cPage.getExtraBlockPageId());
     }
 
     /**
-     * {@link ICompressorDecompressor#compress(byte[], int, int, byte[], int)} may require additional
-     * space to do the compression. see {@link ICompressorDecompressor#computeCompressedBufferSize(int)}.
+     * {@link ICompressorDecompressor} may require additional space to do the compression.
      *
-     * @param compressor
-     * @param size
+     * @param size required size
+     * @see ICompressorDecompressor#computeCompressedBufferSize(int)
      */
     private void ensureBufferCapacity(int size) {
         final int requiredSize = size + RESERVED_HEADER_BYTES;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 1d4c789..6fd18ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -31,12 +31,12 @@
     ByteBuffer buffer;
     public final AtomicInteger pinCount;
     final AtomicBoolean dirty;
-    final ReentrantReadWriteLock latch;
+    protected final ReentrantReadWriteLock latch;
     private final Object replacementStrategyObject;
     private final IPageReplacementStrategy pageReplacementStrategy;
     volatile long dpid; // disk page id (composed of file id and page id)
     CachedPage next;
-    volatile boolean valid;
+    protected volatile boolean valid;
     final AtomicBoolean confiscated;
     private int multiplier;
     private int extraBlockPageId;
@@ -190,6 +190,7 @@
         return multiplier > 1;
     }
 
+    @Override
     public void setCompressedPageOffset(long offset) {
         this.compressedOffset = offset;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
index 87a15d3..7707ac8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
@@ -32,16 +32,19 @@
     private static final int MAX_UNSUCCESSFUL_CYCLE_COUNT = 3;
 
     private IBufferCacheInternal bufferCache;
-    private AtomicInteger clockPtr;
-    private ICacheMemoryAllocator allocator;
-    private AtomicInteger numPages;
-    private AtomicInteger cpIdCounter;
+    private final AtomicInteger clockPtr;
+    private final ICacheMemoryAllocator allocator;
+    private final AtomicInteger numPages;
+    private final AtomicInteger cpIdCounter;
+    private final IDiskCachedPageAllocator pageAllocator;
     private final int pageSize;
     private final int maxAllowedNumPages;
     private final ConcurrentLinkedQueue<Integer> cpIdFreeList;
 
-    public ClockPageReplacementStrategy(ICacheMemoryAllocator allocator, int pageSize, int maxAllowedNumPages) {
+    public ClockPageReplacementStrategy(ICacheMemoryAllocator allocator, IDiskCachedPageAllocator pageAllocator,
+            int pageSize, int maxAllowedNumPages) {
         this.allocator = allocator;
+        this.pageAllocator = pageAllocator;
         this.pageSize = pageSize;
         this.maxAllowedNumPages = maxAllowedNumPages;
         this.clockPtr = new AtomicInteger(0);
@@ -154,7 +157,7 @@
         if (cpId == null) {
             cpId = cpIdCounter.getAndIncrement();
         }
-        CachedPage cPage = new CachedPage(cpId, allocator.allocate(pageSize * multiplier, 1)[0], this);
+        CachedPage cPage = pageAllocator.allocate(cpId, allocator.allocate(pageSize * multiplier, 1)[0], this);
         cPage.setFrameSizeMultiplier(multiplier);
         bufferCache.addPage(cPage);
         numPages.getAndAdd(multiplier);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/DefaultBufferCacheReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/DefaultBufferCacheReadContext.java
index 2a67a06..b44ae25 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/DefaultBufferCacheReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/DefaultBufferCacheReadContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.storage.common.buffercache.context.page;
 
-public final class DefaultBufferCacheReadContext extends AbstractBufferCacheReadContext {
+final class DefaultBufferCacheReadContext extends AbstractBufferCacheReadContext {
     @Override
     public boolean isNewPage() {
         return false;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheReadContext.java
index 216ebfb..f63ab76 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheReadContext.java
@@ -38,7 +38,7 @@
      *
      * @param page that will be pinned
      */
-    void onPin(ICachedPage page);
+    void onPin(ICachedPage page) throws HyracksDataException;
 
     /**
      * Signals that a page will be unpinned
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheWriteContext.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheWriteContext.java
index 7afb4ce..9b3761a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheWriteContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheWriteContext.java
@@ -23,9 +23,33 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageWriter;
 
+/**
+ * This context allows for providing different implementations on {@link IFIFOPageWriter#write(ICachedPage)}
+ */
 public interface IBufferCacheWriteContext {
+    /**
+     * Write to a file managed by {@link IBufferCache}
+     *
+     * @param ioManager ioManager
+     * @param handle    file handle
+     * @param offset    starting offset
+     * @param data      to be written
+     * @return number of written bytes
+     */
     int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data) throws HyracksDataException;
 
+    /**
+     * Write to a file managed by {@link IBufferCache}
+     *
+     * @param ioManager ioManager
+     * @param handle    file handle
+     * @param offset    starting offset
+     * @param data      to be written
+     * @return number of written bytes
+     */
     long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java
new file mode 100644
index 0000000..af4162b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+public final class DummyPhysicalDrive implements IPhysicalDrive {
+    public static final IPhysicalDrive INSTANCE = new DummyPhysicalDrive();
+
+    private DummyPhysicalDrive() {
+    }
+
+    @Override
+    public boolean computeAndCheckIsPressured() {
+        return false;
+    }
+
+    @Override
+    public boolean hasSpace() {
+        return true;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
new file mode 100644
index 0000000..5fb7088
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+/**
+ * Disk cache monitoring service is responsible for monitor the local drives
+ */
+public interface IDiskCacheMonitoringService {
+    /**
+     * Start the monitoring service
+     */
+    void start();
+
+    /**
+     * Stop the monitoring service
+     */
+    void stop();
+
+    /**
+     * @return whether the monitoring service is enabled
+     */
+    boolean isEnabled();
+
+    /**
+     * @return physical drive
+     */
+    IPhysicalDrive getPhysicalDrive();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
new file mode 100644
index 0000000..fc4645d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.LocalResource;
+
+/**
+ * A proxy to notify a disk-cache (a faster disk that is caching a slower resource) about resource lifecycle events.
+ * The notifier could block a resource from being operated on if the disk-cache manager denying access to a resource
+ * TODO Do we care about dataset?
+ */
+public interface IDiskResourceCacheLockNotifier {
+    /**
+     * Notify registering a new resource
+     * Note: this method is not thread-safe outside {@link org.apache.hyracks.storage.common.IResourceLifecycleManager}
+     *
+     * @param datasetId     dataset ID
+     * @param localResource resource to be registered
+     * @param index         of the resource
+     */
+    void onRegister(int datasetId, LocalResource localResource, IIndex index);
+
+    /**
+     * Notify unregistering an existing resource
+     * Note: this method is not thread-safe outside {@link org.apache.hyracks.storage.common.IResourceLifecycleManager}
+     *
+     * @param datasetId  dataset ID
+     * @param resourceId resource ID
+     */
+    void onUnregister(int datasetId, long resourceId);
+
+    /**
+     * Notify opening a resource
+     *
+     * @param datasetId  dataset ID
+     * @param resourceId resource ID
+     */
+    void onOpen(int datasetId, long resourceId);
+
+    /**
+     * Notify closing a resource
+     *
+     * @param datasetId  dataset ID
+     * @param resourceId resource ID
+     */
+    void onClose(int datasetId, long resourceId);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java
new file mode 100644
index 0000000..d2aaec1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+/**
+ * Monitors the pressure of the local drive
+ */
+@ThreadSafe
+public interface IPhysicalDrive {
+    /**
+     * Computes the actual pressure on the drive
+     *
+     * @return true if pressured, false otherwise
+     */
+    boolean computeAndCheckIsPressured();
+
+    /**
+     * @return if drive is still has free space (i.e., not pressured)
+     */
+    boolean hasSpace();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/ISweepContext.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/ISweepContext.java
new file mode 100644
index 0000000..1646116
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/ISweepContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+
+/**
+ * Provides the necessary {@link IBufferCache} functionalities for a sweep operation
+ */
+public interface ISweepContext {
+
+    /**
+     * Open a file for sweeping
+     *
+     * @param fileId to open
+     */
+    void open(int fileId) throws HyracksDataException;
+
+    /**
+     * Close the opened file
+     */
+    void close() throws HyracksDataException;
+
+    /**
+     * Pin a page
+     *
+     * @param dpid    page unique ID
+     * @param bcOpCtx read context
+     * @return pinned page
+     */
+    ICachedPage pin(long dpid, IBufferCacheReadContext bcOpCtx) throws HyracksDataException;
+
+    /**
+     * Unpin a page
+     *
+     * @param page    to unpin
+     * @param bcOpCtx read context
+     */
+    void unpin(ICachedPage page, IBufferCacheReadContext bcOpCtx) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
new file mode 100644
index 0000000..9a7a5d6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+public final class NoOpDiskCacheMonitoringService implements IDiskCacheMonitoringService {
+    public static final IDiskCacheMonitoringService INSTANCE = new NoOpDiskCacheMonitoringService();
+
+    private NoOpDiskCacheMonitoringService() {
+    }
+
+    @Override
+    public void start() {
+        // NoOp
+    }
+
+    @Override
+    public void stop() {
+        // NoOp
+    }
+
+    @Override
+    public boolean isEnabled() {
+        return false;
+    }
+
+    @Override
+    public IPhysicalDrive getPhysicalDrive() {
+        return DummyPhysicalDrive.INSTANCE;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
new file mode 100644
index 0000000..3c0d6c9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.LocalResource;
+
+public final class NoOpDiskResourceCacheLockNotifier implements IDiskResourceCacheLockNotifier {
+    public static final IDiskResourceCacheLockNotifier INSTANCE = new NoOpDiskResourceCacheLockNotifier();
+
+    private NoOpDiskResourceCacheLockNotifier() {
+    }
+
+    @Override
+    public void onRegister(int datasetId, LocalResource localResource, IIndex index) {
+        // NoOp
+    }
+
+    @Override
+    public void onUnregister(int datasetId, long resourceId) {
+        // NoOp
+    }
+
+    @Override
+    public void onOpen(int datasetId, long resourceId) {
+        // NoOp
+    }
+
+    @Override
+    public void onClose(int datasetId, long resourceId) {
+        // NoOp
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/AbstractPrefetchRequest.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/AbstractPrefetchRequest.java
new file mode 100644
index 0000000..e9cef95
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/AbstractPrefetchRequest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk.prefetch;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractPrefetchRequest implements Runnable {
+    private final ReentrantLock lock;
+    private volatile PrefetchState state;
+
+    protected AbstractPrefetchRequest() {
+        lock = new ReentrantLock();
+        state = PrefetchState.QUEUED;
+    }
+
+    public final void reset() {
+        state = PrefetchState.QUEUED;
+    }
+
+    @Override
+    public final void run() {
+        try {
+            if (state == PrefetchState.QUEUED) {
+                // Acquire a lock so the prefetching op. won't get aborted
+                lock.lock();
+                doPrefetch();
+                state = PrefetchState.FINISHED;
+            }
+        } catch (HyracksDataException e) {
+            onException(e);
+            state = PrefetchState.FAILED;
+        } catch (Throwable e) {
+            onFailure(e);
+            state = PrefetchState.FAILED;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @return true if aborted or failed, false otherwise
+     */
+    public final boolean waitOrAbort() {
+        // Acquire lock to ensure the prefetching op. is not running
+        lock.lock();
+        try {
+            switch (state) {
+                case QUEUED:
+                    // The request is still queued, abort
+                    state = PrefetchState.ABORTED;
+                case FAILED:
+                    // The request failed
+                    return true;
+                default:
+                    // The request finished
+                    return false;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    protected abstract void doPrefetch() throws HyracksDataException;
+
+    protected abstract void onException(HyracksDataException e);
+
+    protected abstract void onFailure(Throwable e);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/IPrefetchHandler.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/IPrefetchHandler.java
new file mode 100644
index 0000000..1e5af97
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/IPrefetchHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk.prefetch;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IPrefetchHandler {
+    void request(AbstractPrefetchRequest request) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/PrefetchState.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/PrefetchState.java
new file mode 100644
index 0000000..29491a9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/PrefetchState.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk.prefetch;
+
+enum PrefetchState {
+    QUEUED,
+    ABORTED,
+    FAILED,
+    FINISHED
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
index 3d4d80b..3edec4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
@@ -26,12 +26,15 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.common.buffercache.AbstractBufferedFileIOManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
 import org.apache.hyracks.storage.common.buffercache.CachedPage;
 import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheWriteContext;
 import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
 import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
 import org.apache.hyracks.storage.common.compression.file.NoOpLAFWriter;
@@ -68,17 +71,19 @@
     }
 
     @Override
-    public void read(CachedPage cPage) throws HyracksDataException {
+    public void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException {
         final BufferCacheHeaderHelper header = checkoutHeaderHelper();
         try {
-            long bytesRead =
-                    readToBuffer(header.prepareRead(bufferCache.getPageSizeWithHeader()), getFirstPageOffset(cPage));
+            setPageInfo(cPage);
+            IFileHandle handle = getFileHandle();
+            int pageSize = bufferCache.getPageSizeWithHeader();
+            long bytesRead = header.readFromFile(ioManager, handle, getFirstPageOffset(cPage), pageSize);
 
             if (!verifyBytesRead(bufferCache.getPageSizeWithHeader(), bytesRead)) {
                 return;
             }
 
-            final ByteBuffer buf = header.processHeader(cPage);
+            final ByteBuffer buf = context.processHeader(ioManager, this, header, cPage);
             cPage.getBuffer().put(buf);
         } finally {
             returnHeaderHelper(header);
@@ -87,6 +92,11 @@
         readExtraPages(cPage);
     }
 
+    private void setPageInfo(CachedPage cPage) {
+        cPage.setCompressedPageOffset(getFirstPageOffset(cPage));
+        cPage.setCompressedPageSize(bufferCache.getPageSize());
+    }
+
     private void readExtraPages(CachedPage cPage) throws HyracksDataException {
         final int totalPages = cPage.getFrameSizeMultiplier();
         if (totalPages > 1) {
@@ -98,22 +108,24 @@
     }
 
     @Override
-    protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId)
-            throws HyracksDataException {
+    protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId,
+            IBufferCacheWriteContext context) throws HyracksDataException {
         final ByteBuffer buf = cPage.getBuffer();
         final boolean contiguousLargePages = getPageId(cPage.getDiskPageId()) + 1 == extraBlockPageId;
+        IFileHandle handle = getFileHandle();
         long bytesWritten;
         try {
             buf.limit(contiguousLargePages ? bufferCache.getPageSize() * totalPages : bufferCache.getPageSize());
             buf.position(0);
-            bytesWritten = writeToFile(header.prepareWrite(cPage), getFirstPageOffset(cPage));
+            ByteBuffer[] buffers = header.prepareWrite(cPage);
+            bytesWritten = context.write(ioManager, handle, getFirstPageOffset(cPage), buffers);
         } finally {
             returnHeaderHelper(header);
         }
 
         if (totalPages > 1 && !contiguousLargePages) {
             buf.limit(totalPages * bufferCache.getPageSize());
-            bytesWritten += writeToFile(buf, getExtraPageOffset(cPage));
+            bytesWritten += writeExtraToFile(buf, getExtraPageOffset(cPage));
         }
 
         final int expectedWritten = bufferCache.getPageSizeWithHeader() + bufferCache.getPageSize() * (totalPages - 1);
@@ -121,6 +133,11 @@
     }
 
     @Override
+    public long getStartPageOffset(int pageId) throws HyracksDataException {
+        return (long) pageId * bufferCache.getPageSizeWithHeader();
+    }
+
+    @Override
     public int getNumberOfPages() throws HyracksDataException {
         if (DEBUG) {
             assert getFileSize() % bufferCache.getPageSizeWithHeader() == 0;
@@ -134,6 +151,12 @@
     }
 
     @Override
+    public long getPagesTotalSize(int startPageId, int numberOfPages) throws HyracksDataException {
+        // This could be an overestimate as we cannot determine for sure as extra pages do not have a header
+        return (long) numberOfPages * bufferCache.getPageSizeWithHeader();
+    }
+
+    @Override
     protected long getFirstPageOffset(CachedPage cPage) {
         return getPageOffset(getPageId(cPage.getDiskPageId()));
     }
@@ -143,10 +166,6 @@
         return getPageOffset(cPage.getExtraBlockPageId());
     }
 
-    private long getPageOffset(long pageId) {
-        return pageId * bufferCache.getPageSizeWithHeader();
-    }
-
     public static long getDiskPageId(int fileId, int pageId) {
         return (((long) fileId) << 32) + pageId;
     }
@@ -169,4 +188,8 @@
         }
         return new BufferedFileHandle(fileId, bufferCache, ioManager, headerPageCache, pageReplacementStrategy);
     }
+
+    private long getPageOffset(int pageId) {
+        return (long) pageId * bufferCache.getPageSizeWithHeader();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 01811c7..ed71a94 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -24,11 +24,14 @@
 import org.apache.hyracks.api.compression.ICompressorDecompressor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
 import org.apache.hyracks.storage.common.buffercache.CachedPage;
 import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheWriteContext;
 import org.apache.hyracks.storage.common.compression.file.CompressedFileManager;
 import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
 import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
@@ -45,16 +48,18 @@
     }
 
     @Override
-    public void read(CachedPage cPage) throws HyracksDataException {
+    public void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException {
         final BufferCacheHeaderHelper header = checkoutHeaderHelper();
         try {
             compressedFileManager.setCompressedPageInfo(cPage);
-            long bytesRead = readToBuffer(header.prepareRead(cPage.getCompressedPageSize()), getFirstPageOffset(cPage));
-
+            IFileHandle handle = getFileHandle();
+            int size = cPage.getCompressedPageSize();
+            long bytesRead = header.readFromFile(ioManager, handle, getFirstPageOffset(cPage), size);
             if (!verifyBytesRead(cPage.getCompressedPageSize(), bytesRead)) {
                 return;
             }
-            final ByteBuffer cBuffer = header.processHeader(cPage);
+
+            final ByteBuffer cBuffer = context.processHeader(ioManager, this, header, cPage);
             final ByteBuffer uBuffer = cPage.getBuffer();
             fixBufferPointers(uBuffer, 0);
             if (cPage.getCompressedPageSize() < bufferCache.getPageSizeWithHeader()) {
@@ -93,9 +98,10 @@
     }
 
     @Override
-    protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId)
-            throws HyracksDataException {
+    protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId,
+            IBufferCacheWriteContext context) throws HyracksDataException {
         try {
+            IFileHandle handle = getFileHandle();
             final ByteBuffer cBuffer = header.prepareWrite(cPage, getRequiredBufferSize());
             final ByteBuffer uBuffer = cPage.getBuffer();
             final long pageId = cPage.getDiskPageId();
@@ -103,18 +109,22 @@
             final long bytesWritten;
             final long expectedBytesWritten;
 
-            fixBufferPointers(uBuffer, 0);
+            if (cPage.isLargePage()) {
+                fixBufferPointers(uBuffer, 0);
+            } else {
+                uBuffer.position(0);
+            }
             if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) {
                 cBuffer.position(0);
                 final long offset = compressedFileManager.writePageInfo(pageId, cBuffer.remaining());
                 expectedBytesWritten = cBuffer.limit();
-                bytesWritten = writeToFile(cBuffer, offset);
+                bytesWritten = context.write(ioManager, handle, offset, cBuffer);
             } else {
                 //Compression did not gain any savings
                 final ByteBuffer[] buffers = header.prepareWrite(cPage);
                 final long offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader());
                 expectedBytesWritten = buffers[0].limit() + (long) buffers[1].limit();
-                bytesWritten = writeToFile(buffers, offset);
+                bytesWritten = context.write(ioManager, handle, offset, buffers);
             }
 
             verifyBytesWritten(expectedBytesWritten, bytesWritten);
@@ -147,7 +157,7 @@
             final int length = writeBuffer.remaining();
             final long offset = compressedFileManager.writeExtraPageInfo(extraBlockPageId, length, i - 1);
             expectedBytesWritten += length;
-            bytesWritten += writeToFile(writeBuffer, offset);
+            bytesWritten += writeExtraToFile(writeBuffer, offset);
         }
 
         verifyBytesWritten(expectedBytesWritten, bytesWritten);
@@ -199,6 +209,11 @@
     }
 
     @Override
+    public long getStartPageOffset(int pageId) throws HyracksDataException {
+        return compressedFileManager.getPageOffset(pageId);
+    }
+
+    @Override
     public int getNumberOfPages() {
         return compressedFileManager.getNumberOfPages();
     }
@@ -218,6 +233,11 @@
         return compressedFileManager.getCompressedPageWriter();
     }
 
+    @Override
+    public long getPagesTotalSize(int startPageId, int numberOfPages) throws HyracksDataException {
+        return compressedFileManager.getTotalCompressedSize(startPageId, numberOfPages);
+    }
+
     /* ********************************
      * Compression methods
      * ********************************
@@ -233,7 +253,6 @@
     private void uncompressToPageBuffer(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
         final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor();
         compDecomp.uncompress(cBuffer, uBuffer);
-        verifyUncompressionSize(bufferCache.getPageSize(), uBuffer.remaining());
     }
 
     private int compressToWriteBuffer(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
@@ -246,11 +265,4 @@
         final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor();
         return compDecomp.computeCompressedBufferSize(bufferCache.getPageSize());
     }
-
-    private void verifyUncompressionSize(int expected, int actual) {
-        if (expected != actual) {
-            throwException("Uncompressed", expected, actual);
-        }
-    }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 00a041f..8e002b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -39,6 +39,7 @@
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
 import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
 import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
@@ -146,7 +147,8 @@
             return bufferCache;
         }
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
-        IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, pageSize, numPages);
+        IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator,
+                DefaultDiskCachedPageAllocator.INSTANCE, pageSize, numPages);
         IFileMapProvider fileMapProvider = getFileMapProvider();
         bufferCache = new BufferCache(ioManager, prs, new DelayPageCleanerPolicy(1000),
                 (IFileMapManager) fileMapProvider, maxOpenFiles, 10, threadFactory, new HashMap<>(),
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/TestOnly.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/TestOnly.java
new file mode 100644
index 0000000..458898e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/TestOnly.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.SOURCE)
+public @interface TestOnly {
+}
diff --git a/hyracks-fullstack/hyracks/pom.xml b/hyracks-fullstack/hyracks/pom.xml
index 96bf131..ff390a9 100644
--- a/hyracks-fullstack/hyracks/pom.xml
+++ b/hyracks-fullstack/hyracks/pom.xml
@@ -101,5 +101,6 @@
     <module>hyracks-hdfs</module>
     <module>hyracks-dist</module>
     <module>hyracks-http</module>
+    <module>hyracks-cloud</module>
   </modules>
 </project>