[ASTERIXDB-3389][STO] Support caching/eviciting columns

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Prepare columnar indexes to support caching and
evicting columns.

Change-Id: Ib557608b0b25219ffc00a76325091a92f5e77698
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18260
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index 43aa873..bbd8d87 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -93,6 +93,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-cloud</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
@@ -104,5 +109,10 @@
       <groupId>it.unimi.dsi</groupId>
       <artifactId>fastutil-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
new file mode 100644
index 0000000..29258b5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.api.projection;
+
+public enum ColumnProjectorType {
+    MERGE,
+    QUERY,
+    MODIFY
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
index 641f704..b1bfe87 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
@@ -48,4 +48,9 @@
      * @return number of filtered columns
      */
     int getNumberOfFilteredColumns();
+
+    /**
+     * @return the type of {@link IColumnTupleProjector} that created this projection info
+     */
+    ColumnProjectorType getProjectorType();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
new file mode 100644
index 0000000..38a1713
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.sweeper.SweepContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.DefaultColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write.CloudColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+
+/**
+ * The disk manager cannot be shared among different partitions as columns are local to each partition.
+ * For example, column 9 in partition 0 corresponds to "salary" while column 9 in partition 1 corresponds to "age".
+ */
+public final class CloudColumnIndexDiskCacheManager implements IColumnIndexDiskCacheManager {
+    private final IColumnTupleProjector sweepProjector;
+    private final IPhysicalDrive drive;
+    private final ColumnSweepPlanner planner;
+    private final ColumnSweeper sweeper;
+
+    public CloudColumnIndexDiskCacheManager(int numberOfPrimaryKeys, IColumnTupleProjector sweepProjector,
+            IPhysicalDrive drive) {
+        this.sweepProjector = sweepProjector;
+        this.drive = drive;
+        planner = new ColumnSweepPlanner(numberOfPrimaryKeys, System::nanoTime);
+        sweeper = new ColumnSweeper(numberOfPrimaryKeys);
+    }
+
+    @Override
+    public void activate(int numberOfColumns, List<ILSMDiskComponent> diskComponents, IBufferCache bufferCache)
+            throws HyracksDataException {
+        planner.onActivate(numberOfColumns, diskComponents, sweepProjector, bufferCache);
+    }
+
+    @Override
+    public IColumnWriteContext createWriteContext(int numberOfColumns, LSMIOOperationType operationType) {
+        return new CloudColumnWriteContext(drive, planner, numberOfColumns);
+    }
+
+    @Override
+    public IColumnReadContext createReadContext(IColumnProjectionInfo projectionInfo) {
+        ColumnProjectorType projectorType = projectionInfo.getProjectorType();
+        if (projectorType == ColumnProjectorType.QUERY) {
+            planner.access(projectionInfo, drive.hasSpace());
+        } else if (projectorType == ColumnProjectorType.MODIFY) {
+            planner.setIndexedColumns(projectionInfo);
+            // Requested (and indexed) columns will be persisted if space permits
+            return DefaultColumnReadContext.INSTANCE;
+        }
+        return new CloudColumnReadContext(projectionInfo, drive, planner.getPlanCopy());
+    }
+
+    @Override
+    public boolean isActive() {
+        return planner.isActive();
+    }
+
+    @Override
+    public boolean isSweepable() {
+        return true;
+    }
+
+    @Override
+    public boolean prepareSweepPlan() {
+        return planner.plan();
+    }
+
+    @Override
+    public long sweep(ISweepContext context) throws HyracksDataException {
+        SweepContext sweepContext = (SweepContext) context;
+        return sweeper.sweep(planner.getPlanCopy(), sweepContext, sweepProjector);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
new file mode 100644
index 0000000..9fc60fa
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeperUtil.EMPTY;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnPageIndex;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.longs.LongArrays;
+import it.unimi.dsi.fastutil.longs.LongComparator;
+
+/**
+ * Computes columns offsets, lengths, and pages
+ */
+public final class ColumnRanges {
+    private static final LongComparator OFFSET_COMPARATOR =
+            (x, y) -> Integer.compare(getOffsetFromPair(x), getOffsetFromPair(y));
+    private final int numberOfPrimaryKeys;
+
+    // For eviction
+    private final BitSet nonEvictablePages;
+
+    // For Query
+    private final BitSet evictablePages;
+    private final BitSet cloudOnlyPages;
+
+    private ColumnBTreeReadLeafFrame leafFrame;
+    private long[] offsetColumnIndexPairs;
+    private int[] lengths;
+    private int[] columnsOrder;
+    private int pageZeroId;
+
+    public ColumnRanges(int numberOfPrimaryKeys) {
+        this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+
+        offsetColumnIndexPairs = new long[0];
+        lengths = new int[0];
+        columnsOrder = new int[0];
+
+        nonEvictablePages = new BitSet();
+
+        evictablePages = new BitSet();
+        cloudOnlyPages = new BitSet();
+    }
+
+    /**
+     * @return number of primary keys
+     */
+    public int getNumberOfPrimaryKeys() {
+        return numberOfPrimaryKeys;
+    }
+
+    /**
+     * Reset ranges for initializing {@link ColumnSweepPlanner}
+     *
+     * @param leafFrame to compute the ranges for
+     */
+    public void reset(ColumnBTreeReadLeafFrame leafFrame) {
+        reset(leafFrame, EMPTY, EMPTY, EMPTY);
+    }
+
+    /**
+     * Reset column ranges for {@link ColumnSweeper}
+     *
+     * @param leafFrame to compute the ranges for
+     * @param plan      eviction plan
+     */
+    public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet plan) {
+        reset(leafFrame, plan, EMPTY, EMPTY);
+    }
+
+    /**
+     * Reset ranges for {@link CloudColumnReadContext}
+     *
+     * @param leafFrame        to compute the ranges for
+     * @param requestedColumns required columns
+     * @param evictableColumns columns that are or will be evicted
+     * @param cloudOnlyColumns locked columns that cannot be read from a local disk
+     */
+    public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet requestedColumns, BitSet evictableColumns,
+            BitSet cloudOnlyColumns) {
+        // Set leafFrame
+        this.leafFrame = leafFrame;
+        // Ensure arrays capacities (given the leafFrame's columns and pages)
+        init();
+
+        // Get the number of columns in a page
+        int numberOfColumns = leafFrame.getNumberOfColumns();
+        for (int i = 0; i < numberOfColumns; i++) {
+            long offset = leafFrame.getColumnOffset(i);
+            // Set the first 32-bits to the offset and the second 32-bits to columnIndex
+            offsetColumnIndexPairs[i] = (offset << 32) + i;
+        }
+
+        // Set artificial offset to determine the last column's length
+        offsetColumnIndexPairs[numberOfColumns] = (leafFrame.getMegaLeafNodeLengthInBytes() << 32) + numberOfColumns;
+
+        // Sort the pairs by offset (i.e., lowest offset first)
+        LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfColumns, OFFSET_COMPARATOR);
+
+        int columnOrdinal = 0;
+        for (int i = 0; i < numberOfColumns; i++) {
+            int columnIndex = getColumnIndexFromPair(offsetColumnIndexPairs[i]);
+            int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
+            int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
+
+            // Compute the column's length in bytes (set 0 for PKs)
+            int length = columnIndex < numberOfPrimaryKeys ? 0 : nextOffset - offset;
+            lengths[columnIndex] = length;
+
+            // Get start page ID (given the computed length above)
+            int startPageId = getColumnStartPageIndex(columnIndex);
+            // Get the number of pages (given the computed length above)
+            int numberOfPages = getColumnNumberOfPages(columnIndex);
+
+            if (columnIndex >= numberOfPrimaryKeys && requestedColumns.get(columnIndex)) {
+                // Set column index
+                columnsOrder[columnOrdinal++] = columnIndex;
+                // Compute cloud-only and evictable pages
+                setCloudOnlyAndEvictablePages(columnIndex, cloudOnlyColumns, evictableColumns, startPageId,
+                        numberOfPages);
+                // A requested column. Keep its pages as requested
+                continue;
+            }
+
+            // Mark the page as non-evictable
+            for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+                nonEvictablePages.set(j);
+            }
+        }
+
+        // Bound the nonRequestedPages to the number of pages in the mega leaf node
+        nonEvictablePages.set(leafFrame.getMegaLeafNodeNumberOfPages());
+        // to indicate the end
+        columnsOrder[columnOrdinal] = -1;
+    }
+
+    /**
+     * First page of a column
+     *
+     * @param columnIndex column index
+     * @return pageID
+     */
+    public int getColumnStartPageIndex(int columnIndex) {
+        int pageSize = leafFrame.getBuffer().capacity();
+        return getColumnPageIndex(leafFrame.getColumnOffset(columnIndex), pageSize);
+    }
+
+    /**
+     * Length of a column in pages
+     *
+     * @param columnIndex column index
+     * @return number of pages
+     */
+    public int getColumnNumberOfPages(int columnIndex) {
+        int pageSize = leafFrame.getBuffer().capacity();
+        int numberOfPages = getNumberOfPages(getColumnLength(columnIndex), pageSize);
+        return numberOfPages == 0 ? 1 : numberOfPages;
+    }
+
+    /**
+     * Length of a column in bytes
+     *
+     * @param columnIndex column index
+     * @return number of bytes
+     */
+    public int getColumnLength(int columnIndex) {
+        return lengths[columnIndex];
+    }
+
+    /**
+     * Returns true if the page is meant to be read from the cloud only
+     *
+     * @param pageId page ID
+     * @return true of the page should be read from the cloud, false otherwise
+     * @see #reset(ColumnBTreeReadLeafFrame, BitSet, BitSet, BitSet)
+     */
+    public boolean isCloudOnly(int pageId) {
+        // Compute the relative page ID for this mega leaf node
+        int relativePageId = pageId - pageZeroId;
+        return cloudOnlyPages.get(relativePageId);
+    }
+
+    /**
+     * Whether the page has been or will be evicted
+     *
+     * @param pageId page ID
+     * @return true of the page was or will be evicted, false otherwise
+     */
+    public boolean isEvictable(int pageId) {
+        int relativePageId = pageId - pageZeroId;
+        return evictablePages.get(relativePageId);
+    }
+
+    /**
+     * @return Bitset of all non-requested pages
+     */
+    public BitSet getNonEvictablePages() {
+        return nonEvictablePages;
+    }
+
+    /**
+     * @return you the order of columns that should be read in order to ensure (semi) sequential access.
+     * Sequential means page X is read before page Y, forall X and Y where X < Y
+     */
+    public int[] getColumnsOrder() {
+        return columnsOrder;
+    }
+
+    private void init() {
+        int numberOfColumns = leafFrame.getNumberOfColumns();
+        offsetColumnIndexPairs = LongArrays.ensureCapacity(offsetColumnIndexPairs, numberOfColumns + 1, 0);
+        lengths = IntArrays.ensureCapacity(lengths, numberOfColumns, 0);
+        columnsOrder = IntArrays.ensureCapacity(columnsOrder, numberOfColumns + 1, 0);
+        nonEvictablePages.clear();
+        evictablePages.clear();
+        cloudOnlyPages.clear();
+        pageZeroId = leafFrame.getPageId();
+    }
+
+    private static int getOffsetFromPair(long pair) {
+        return (int) (pair >> 32);
+    }
+
+    private static int getColumnIndexFromPair(long pair) {
+        return (int) pair;
+    }
+
+    private void setCloudOnlyAndEvictablePages(int columnIndex, BitSet cloudOnlyColumns, BitSet evictableColumns,
+            int startPageId, int numberOfPages) {
+        if (evictableColumns == EMPTY && cloudOnlyColumns == EMPTY) {
+            return;
+        }
+
+        // Find pages that meant to be read from the cloud only or are evictable
+        boolean cloudOnly = cloudOnlyColumns.get(columnIndex);
+        boolean evictable = evictableColumns.get(columnIndex);
+        if (cloudOnly || evictable) {
+            for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+                if (cloudOnly) {
+                    cloudOnlyPages.set(j);
+                } else {
+                    evictablePages.set(j);
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        int numberOfPages = leafFrame.getMegaLeafNodeNumberOfPages();
+        builder.append("       ");
+        for (int i = 0; i < numberOfPages; i++) {
+            builder.append(String.format("%02d", i));
+            builder.append("  ");
+        }
+
+        builder.append('\n');
+        for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
+            builder.append(String.format("%03d", i));
+            builder.append(":");
+            int startPageId = getColumnStartPageIndex(i);
+            int columnPagesCount = getColumnNumberOfPages(i);
+            printColumnPages(builder, numberOfPages, startPageId, columnPagesCount);
+        }
+
+        builder.append("nonEvictablePages: ");
+        builder.append(nonEvictablePages);
+        builder.append('\n');
+        builder.append("evictablePages: ");
+        builder.append(evictablePages);
+        builder.append('\n');
+        builder.append("cloudOnlyPages: ");
+        builder.append(cloudOnlyPages);
+
+        return builder.toString();
+    }
+
+    private void printColumnPages(StringBuilder builder, int numberOfPages, int startPageId, int columnPagesCount) {
+        for (int i = 0; i < numberOfPages; i++) {
+            builder.append("   ");
+            if (i >= startPageId && i < startPageId + columnPagesCount) {
+                builder.append(1);
+            } else {
+                builder.append(0);
+            }
+        }
+        builder.append('\n');
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
new file mode 100644
index 0000000..af69fe5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import static org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.buffercache.page.ISweepLockInfo;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepLockInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+@NotThreadSafe
+public final class CloudColumnReadContext implements IColumnReadContext {
+    private final ColumnProjectorType operation;
+    private final IPhysicalDrive drive;
+    private final BitSet plan;
+    private final BitSet cloudOnlyColumns;
+    private final ColumnRanges columnRanges;
+    private final CloudMegaPageReadContext columnCtx;
+    private final List<ICachedPage> pinnedPages;
+    private final BitSet projectedColumns;
+
+    public CloudColumnReadContext(IColumnProjectionInfo projectionInfo, IPhysicalDrive drive, BitSet plan) {
+        this.operation = projectionInfo.getProjectorType();
+        this.drive = drive;
+        this.plan = plan;
+        columnRanges = new ColumnRanges(projectionInfo.getNumberOfPrimaryKeys());
+        cloudOnlyColumns = new BitSet();
+        columnCtx = new CloudMegaPageReadContext(operation, columnRanges, drive);
+        pinnedPages = new ArrayList<>();
+        projectedColumns = new BitSet();
+        if (operation == QUERY || operation == MODIFY) {
+            for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) {
+                int columnIndex = projectionInfo.getColumnIndex(i);
+                if (columnIndex >= 0) {
+                    projectedColumns.set(columnIndex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onPin(ICachedPage page) {
+        CloudCachedPage cloudPage = (CloudCachedPage) page;
+        ISweepLockInfo lockTest = cloudPage.beforeRead();
+        if (lockTest.isLocked()) {
+            ColumnSweepLockInfo lockedColumns = (ColumnSweepLockInfo) lockTest;
+            lockedColumns.getLockedColumns(cloudOnlyColumns);
+        }
+    }
+
+    @Override
+    public void onUnpin(ICachedPage page) {
+        CloudCachedPage cloudPage = (CloudCachedPage) page;
+        cloudPage.afterRead();
+    }
+
+    @Override
+    public boolean isNewPage() {
+        return false;
+    }
+
+    @Override
+    public boolean incrementStats() {
+        return true;
+    }
+
+    @Override
+    public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage) throws HyracksDataException {
+        // Page zero will be persisted (always) if free space permits
+        return readAndPersistPage(ioManager, fileHandle, header, cPage, drive.hasSpace());
+    }
+
+    @Override
+    public ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+            throws HyracksDataException {
+        // TODO do we support prefetching?
+        ICachedPage nextPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrame.getNextLeaf()), this);
+        release(bufferCache);
+        bufferCache.unpin(leafFrame.getPage(), this);
+        leafFrame.setPage(nextPage);
+        return nextPage;
+    }
+
+    @Override
+    public void prepareColumns(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+            throws HyracksDataException {
+        if (leafFrame.getTupleCount() == 0) {
+            return;
+        }
+
+        // TODO handle prefetch if supported
+
+        columnRanges.reset(leafFrame, projectedColumns, plan, cloudOnlyColumns);
+        int pageZeroId = leafFrame.getPageId();
+        int[] columnsOrders = columnRanges.getColumnsOrder();
+        int i = 0;
+        int columnIndex = columnsOrders[i];
+        while (columnIndex > -1) {
+            if (columnIndex < columnRanges.getNumberOfPrimaryKeys()) {
+                columnIndex = columnsOrders[++i];
+                continue;
+            }
+
+            int startPageId = columnRanges.getColumnStartPageIndex(columnIndex);
+            // Will increment the number pages if the next column's pages are contiguous to this column's pages
+            int numberOfPages = columnRanges.getColumnNumberOfPages(columnIndex);
+
+            // Advance to the next column to check if it has contiguous pages
+            columnIndex = columnsOrders[++i];
+            while (columnIndex > -1) {
+                // Get the next column's start page ID
+                int nextStartPageId = columnRanges.getColumnStartPageIndex(columnIndex);
+                if (nextStartPageId > startPageId + numberOfPages + 1) {
+                    // The next startPageId is not contiguous, stop.
+                    break;
+                }
+
+                // Last page of this column
+                int nextLastPage = nextStartPageId + columnRanges.getColumnNumberOfPages(columnIndex);
+                // The next column's pages are contiguous. Combine its ranges with the previous one.
+                numberOfPages = nextLastPage - startPageId;
+                // Advance to the next column
+                columnIndex = columnsOrders[++i];
+            }
+
+            columnCtx.prepare(numberOfPages);
+            pin(bufferCache, fileId, pageZeroId, startPageId, numberOfPages);
+        }
+    }
+
+    private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numOfRequestedPages)
+            throws HyracksDataException {
+        for (int i = start; i < start + numOfRequestedPages; i++) {
+            long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
+            pinnedPages.add(bufferCache.pin(dpid, columnCtx));
+        }
+    }
+
+    @Override
+    public void release(IBufferCache bufferCache) throws HyracksDataException {
+        // Release might differ in the future if prefetching is supported
+        close(bufferCache);
+    }
+
+    @Override
+    public void close(IBufferCache bufferCache) throws HyracksDataException {
+        release(pinnedPages, bufferCache, columnCtx);
+        columnCtx.close();
+    }
+
+    private static void release(List<ICachedPage> pinnedPages, IBufferCache bufferCache, IBufferCacheReadContext ctx)
+            throws HyracksDataException {
+        for (int i = 0; i < pinnedPages.size(); i++) {
+            bufferCache.unpin(pinnedPages.get(i), ctx);
+        }
+        pinnedPages.clear();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
new file mode 100644
index 0000000..0f0b0b9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
+import static org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
+import static org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@NotThreadSafe
+final class CloudMegaPageReadContext implements IBufferCacheReadContext {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ColumnProjectorType operation;
+    private final ColumnRanges columnRanges;
+    private final IPhysicalDrive drive;
+    private int numberOfContiguousPages;
+    private int pageCounter;
+    private InputStream gapStream;
+
+    CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges columnRanges, IPhysicalDrive drive) {
+        this.operation = operation;
+        this.columnRanges = columnRanges;
+        this.drive = drive;
+    }
+
+    public void prepare(int numberOfContiguousPages) throws HyracksDataException {
+        close();
+        this.numberOfContiguousPages = numberOfContiguousPages;
+        pageCounter = 0;
+    }
+
+    @Override
+    public void onPin(ICachedPage page) throws HyracksDataException {
+        CloudCachedPage cachedPage = (CloudCachedPage) page;
+        if (gapStream != null && cachedPage.skipCloudStream()) {
+            /*
+             * This page is requested but the buffer cache has a valid copy in memory. Also, the page itself was
+             * requested to be read from the cloud. Since this page is valid, no buffer cache read() will be performed.
+             * As the buffer cache read() is also responsible for persisting the bytes read from the cloud, we can end
+             * up writing the bytes of this page in the position of another page. Therefore, we should skip the bytes
+             * for this particular page to avoid placing the bytes of this page into another page's position.
+             */
+            try {
+                long remaining = cachedPage.getCompressedPageSize();
+                while (remaining > 0) {
+                    remaining -= gapStream.skip(remaining);
+                }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    @Override
+    public void onUnpin(ICachedPage page) {
+        // NoOp
+    }
+
+    @Override
+    public boolean isNewPage() {
+        return false;
+    }
+
+    @Override
+    public boolean incrementStats() {
+        return true;
+    }
+
+    @Override
+    public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage) throws HyracksDataException {
+        boolean empty = BufferCacheCloudReadContextUtil.isEmpty(header);
+        int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
+        boolean cloudOnly = columnRanges.isCloudOnly(pageId);
+        ByteBuffer buffer;
+        if (empty || cloudOnly || gapStream != null) {
+            boolean evictable = columnRanges.isEvictable(pageId);
+            /*
+             * Persist iff the following conditions are satisfied:
+             * - The page is empty
+             * - The page is not being evicted (cloudOnly)
+             * - The page is not planned for eviction (evictable)
+             * - The operation is not a merge operation (the component will be deleted anyway)
+             * - The disk has space
+             *
+             * Note: 'emtpy' can be false while 'cloudOnly is true'. We cannot read from disk as the page can be
+             * evicted at any moment. In other words, the sweeper told us that it is going to evict this page; hence
+             * 'cloudOnly' is true.
+             */
+            boolean persist = empty && !cloudOnly && !evictable && operation != MERGE && drive.hasSpace();
+            buffer = readFromStream(ioManager, fileHandle, header, cPage, persist);
+            buffer.position(RESERVED_HEADER_BYTES);
+        } else {
+            /*
+             * Here we can find a page that is planned for eviction, but it has not being evicted yet
+             * (i.e., empty = false). This could happen if the cursor is at a point the sweeper hasn't
+             * reached yet (i.e., cloudOnly = false).
+             */
+            buffer = DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
+        }
+
+        if (++pageCounter == numberOfContiguousPages) {
+            close();
+        }
+
+        return buffer;
+    }
+
+    void close() throws HyracksDataException {
+        if (gapStream != null) {
+            try {
+                gapStream.close();
+                gapStream = null;
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    private ByteBuffer readFromStream(IOManager ioManager, BufferedFileHandle fileHandle,
+            BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException {
+        InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
+        ByteBuffer buffer = header.getBuffer();
+        buffer.position(0);
+        try {
+            while (buffer.remaining() != 0) {
+                int length = stream.read(buffer.array(), buffer.position(), buffer.remaining());
+                if (length < 0) {
+                    throw new IllegalStateException("Stream should not be empty!");
+                }
+                buffer.position(buffer.position() + length);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+
+        buffer.flip();
+
+        if (persist) {
+            long offset = cPage.getCompressedPageOffset();
+            ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+            BufferCacheCloudReadContextUtil.persist(cloudIOManager, fileHandle.getFileHandle(), buffer, offset);
+        }
+
+        return buffer;
+    }
+
+    private InputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage)
+            throws HyracksDataException {
+        if (gapStream != null) {
+            return gapStream;
+        }
+
+        LOGGER.info("Cloud stream read for {} pages", numberOfContiguousPages - pageCounter);
+        int requiredNumOfPages = numberOfContiguousPages - pageCounter;
+        long offset = cPage.getCompressedPageOffset();
+        int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
+        long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
+
+        ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+        gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), offset, length);
+
+        return gapStream;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
new file mode 100644
index 0000000..dcde833
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.buffercache.context.DefaultCloudOnlyWriteContext;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
+import org.apache.hyracks.storage.common.buffercache.context.write.DefaultBufferCacheWriteContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
+public final class CloudColumnWriteContext implements IColumnWriteContext {
+    private static final int INITIAL_NUMBER_OF_COLUMNS = 32;
+    private final IPhysicalDrive drive;
+    private final ColumnSweepPlanner planner;
+    private final BitSet plan;
+    private final IntSet indexedColumns;
+    private int[] sizes;
+    private int numberOfColumns;
+    private IBufferCacheWriteContext currentContext;
+    /**
+     * writeAndSwap = true means the next call to write() will persist the page locally and swap to cloud-only
+     * writer (i.e., the following pages will be written in the cloud but not locally).
+     * <p>
+     * 'writeAndSwap' is set to 'true' iff the previous column's last page should be persisted AND it is
+     * 'overlapping' with this column's first page.
+     */
+    private boolean writeLocallyAndSwitchToCloudOnly;
+
+    public CloudColumnWriteContext(IPhysicalDrive drive, ColumnSweepPlanner planner, int numberOfColumns) {
+        this.drive = drive;
+        this.planner = planner;
+        this.plan = planner.getPlanCopy();
+        this.indexedColumns = planner.getIndexedColumnsCopy();
+        int initialLength =
+                planner.getNumberOfPrimaryKeys() == numberOfColumns ? INITIAL_NUMBER_OF_COLUMNS : numberOfColumns;
+        sizes = new int[initialLength];
+        // Number of columns is not known during the flush operation
+        this.numberOfColumns = 0;
+        currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+    }
+
+    @Override
+    public void startWritingColumn(int columnIndex, boolean overlapping) {
+        if (drive.hasSpace() || indexedColumns.contains(columnIndex)) {
+            // The current column should be persisted locally if free disk space permits
+            currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+        } else if (plan.get(columnIndex)) {
+            // This column was planned for eviction, do not persist.
+            if (overlapping && currentContext == DefaultBufferCacheWriteContext.INSTANCE) {
+                // The previous column's last page should be persisted AND it is overlapping with the current column's
+                // first page. Persist the first page locally and switch to cloud-only writer.
+                writeLocallyAndSwitchToCloudOnly = true;
+            } else {
+                // The previous column's last page is not overlapping. Switch to cloud-only writer (if not already)
+                currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+            }
+        } else {
+            // Local drive is pressured. Write to cloud only.
+            currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+        }
+    }
+
+    @Override
+    public void endWritingColumn(int columnIndex, int size) {
+        ensureCapacity(columnIndex);
+        sizes[columnIndex] = Math.max(sizes[columnIndex], size);
+    }
+
+    @Override
+    public void columnsPersisted() {
+        // Set the default writer context to persist pageZero and the interior nodes' pages locally
+        currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+        writeLocallyAndSwitchToCloudOnly = false;
+    }
+
+    @Override
+    public void close() {
+        // Report the sizes of the written columns
+        planner.adjustColumnSizes(sizes, numberOfColumns);
+    }
+
+    /*
+     * ************************************************************************************************
+     * WRITE methods
+     * ************************************************************************************************
+     */
+
+    @Override
+    public int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data)
+            throws HyracksDataException {
+        int writtenBytes = currentContext.write(ioManager, handle, offset, data);
+        switchIfNeeded();
+        return writtenBytes;
+    }
+
+    @Override
+    public long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data)
+            throws HyracksDataException {
+        long writtenBytes = currentContext.write(ioManager, handle, offset, data);
+        switchIfNeeded();
+        return writtenBytes;
+    }
+
+    /*
+     * ************************************************************************************************
+     * helper methods
+     * ************************************************************************************************
+     */
+
+    private void ensureCapacity(int columnIndex) {
+        int length = sizes.length;
+        if (columnIndex >= length) {
+            sizes = IntArrays.grow(sizes, columnIndex + 1);
+        }
+
+        numberOfColumns = Math.max(numberOfColumns, columnIndex + 1);
+    }
+
+    private void switchIfNeeded() {
+        if (writeLocallyAndSwitchToCloudOnly) {
+            // Switch to cloud-only writer
+            currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+            writeLocallyAndSwitchToCloudOnly = false;
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
new file mode 100644
index 0000000..ed83e84
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.cloud.buffercache.page.ISweepLockInfo;
+
+public final class ColumnSweepLockInfo implements ISweepLockInfo {
+    private final BitSet lockedColumns;
+
+    public ColumnSweepLockInfo() {
+        lockedColumns = new BitSet();
+    }
+
+    /**
+     * Reset the lock with plan's columns
+     *
+     * @param plan contains the columns to be locked
+     */
+    void reset(BitSet plan) {
+        lockedColumns.clear();
+        lockedColumns.or(plan);
+    }
+
+    /**
+     * Clear and set the locked columns in the provided {@link BitSet}
+     *
+     * @param lockedColumns used to get the locked columns
+     */
+    public void getLockedColumns(BitSet lockedColumns) {
+        lockedColumns.clear();
+        lockedColumns.or(this.lockedColumns);
+    }
+
+    @Override
+    public boolean isLocked() {
+        return true;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
new file mode 100644
index 0000000..9549f4f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongArrays;
+
+public final class ColumnSweepPlanner {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final double SIZE_WEIGHT = 0.3d;
+    private static final double LAST_ACCESS_WEIGHT = 1.0d - SIZE_WEIGHT;
+    private static final double INITIAL_PUNCHABLE_THRESHOLD = 0.7d;
+    private static final double PUNCHABLE_THRESHOLD_DECREMENT = 0.7d;
+    private static final int MAX_ITERATION_COUNT = 5;
+    private static final int REEVALUATE_PLAN_THRESHOLD = 50;
+
+    private final AtomicBoolean active;
+    private final int numberOfPrimaryKeys;
+    private final BitSet plan;
+    private final BitSet reevaluatedPlan;
+    private final IntSet indexedColumns;
+    private final ISweepClock clock;
+    private int numberOfColumns;
+    private long lastAccess;
+    private int maxSize;
+    private int[] sizes;
+    private long[] lastAccesses;
+
+    private double punchableThreshold;
+    private long lastSweepTs;
+    private int numberOfSweptColumns;
+    private int numberOfCloudRequests;
+
+    public ColumnSweepPlanner(int numberOfPrimaryKeys, ISweepClock clock) {
+        this.clock = clock;
+        active = new AtomicBoolean(false);
+        this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+        sizes = new int[0];
+        lastAccesses = new long[0];
+        indexedColumns = new IntOpenHashSet();
+        plan = new BitSet();
+        reevaluatedPlan = new BitSet();
+        punchableThreshold = INITIAL_PUNCHABLE_THRESHOLD;
+    }
+
+    public boolean isActive() {
+        return active.get();
+    }
+
+    public int getNumberOfPrimaryKeys() {
+        return numberOfPrimaryKeys;
+    }
+
+    public void onActivate(int numberOfColumns, List<ILSMDiskComponent> diskComponents,
+            IColumnTupleProjector sweepProjector, IBufferCache bufferCache) throws HyracksDataException {
+        resizeStatsArrays(numberOfColumns);
+        setInitialSizes(diskComponents, sweepProjector, bufferCache);
+        active.set(true);
+    }
+
+    public void setIndexedColumns(IColumnProjectionInfo projectionInfo) {
+        indexedColumns.clear();
+        for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) {
+            int columnIndex = projectionInfo.getColumnIndex(i);
+            indexedColumns.add(columnIndex);
+        }
+    }
+
+    public IntSet getIndexedColumnsCopy() {
+        return new IntOpenHashSet(indexedColumns);
+    }
+
+    public synchronized void access(IColumnProjectionInfo projectionInfo, boolean hasSpace) {
+        resetPlanIfNeeded(hasSpace);
+        long accessTime = clock.getCurrentTime();
+        lastAccess = accessTime;
+        int numberOfColumns = projectionInfo.getNumberOfProjectedColumns();
+        boolean requireCloudAccess = false;
+        for (int i = 0; i < numberOfColumns; i++) {
+            int columnIndex = projectionInfo.getColumnIndex(i);
+            if (columnIndex >= 0) {
+                // columnIndex can be -1 when accessing a non-existing column (i.e., not known by the schema)
+                lastAccesses[columnIndex] = accessTime;
+                requireCloudAccess |= numberOfSweptColumns > 0 && plan.get(columnIndex);
+            }
+        }
+
+        numberOfCloudRequests += requireCloudAccess ? 1 : 0;
+    }
+
+    public synchronized void adjustColumnSizes(int[] newSizes, int numberOfColumns) {
+        resizeStatsArrays(numberOfColumns);
+        for (int i = 0; i < numberOfColumns; i++) {
+            int newSize = newSizes[i];
+            sizes[i] = Math.max(sizes[i], newSize);
+            maxSize = Math.max(maxSize, newSize);
+        }
+    }
+
+    public synchronized boolean plan() {
+        plan.clear();
+        int numberOfEvictableColumns = 0;
+        int iter = 0;
+        // Calculate weights: Ensure the plan contains new columns that never been swept
+        while (iter < MAX_ITERATION_COUNT && numberOfEvictableColumns < numberOfColumns) {
+            if (numberOfEvictableColumns > 0) {
+                // Do not reiterate if we found columns to evict
+                break;
+            }
+
+            // Find evictable columns
+            numberOfEvictableColumns += findEvictableColumns(plan);
+
+            // The next iteration/plan will be more aggressive
+            punchableThreshold *= PUNCHABLE_THRESHOLD_DECREMENT;
+            iter++;
+        }
+        // Register the plan time
+        lastSweepTs = clock.getCurrentTime();
+        // Add the number of evictable columns
+        numberOfSweptColumns += numberOfEvictableColumns;
+        if (numberOfEvictableColumns > 0) {
+            LOGGER.info("Planning to evict {} columns. The evictable columns are {}", numberOfEvictableColumns, plan);
+            return true;
+        }
+
+        LOGGER.info("Couldn't find columns to evict after {} iteration", iter);
+        return false;
+    }
+
+    public synchronized BitSet getPlanCopy() {
+        return (BitSet) plan.clone();
+    }
+
+    private double getWeight(int i, IntSet indexedColumns, int numberOfPrimaryKeys) {
+        if (i < numberOfPrimaryKeys || indexedColumns.contains(i)) {
+            return -1.0;
+        }
+
+        double sizeWeight = sizes[i] / (double) maxSize * SIZE_WEIGHT;
+        double lasAccessWeight = (lastAccess - lastAccesses[i]) / (double) lastAccess * LAST_ACCESS_WEIGHT;
+
+        return sizeWeight + lasAccessWeight;
+    }
+
+    private void resizeStatsArrays(int numberOfColumns) {
+        sizes = IntArrays.ensureCapacity(sizes, numberOfColumns);
+        lastAccesses = LongArrays.ensureCapacity(lastAccesses, numberOfColumns);
+        this.numberOfColumns = numberOfColumns - numberOfPrimaryKeys;
+    }
+
+    private void setInitialSizes(List<ILSMDiskComponent> diskComponents, IColumnTupleProjector sweepProjector,
+            IBufferCache bufferCache) throws HyracksDataException {
+        // This runs when activating an index (no need to synchronize on the opTracker)
+        if (diskComponents.isEmpty()) {
+            return;
+        }
+
+        IColumnProjectionInfo columnProjectionInfo =
+                ColumnSweeperUtil.createColumnProjectionInfo(diskComponents, sweepProjector);
+        ILSMDiskComponent latestComponent = diskComponents.get(0);
+        ILSMDiskComponent oldestComponent = diskComponents.get(diskComponents.size() - 1);
+        ColumnBTreeReadLeafFrame leafFrame = ColumnSweeperUtil.createLeafFrame(columnProjectionInfo, latestComponent);
+        ColumnRanges ranges = new ColumnRanges(columnProjectionInfo.getNumberOfPrimaryKeys());
+        // Get the column sizes from the freshest component, which has the columns of the most recent schema
+        setColumnSizes(latestComponent, leafFrame, ranges, bufferCache);
+        if (isMergedComponent(oldestComponent.getId())) {
+            // Get the column sizes from the oldest merged component, which probably has the largest columns
+            setColumnSizes(oldestComponent, leafFrame, ranges, bufferCache);
+        }
+    }
+
+    private void setColumnSizes(ILSMDiskComponent diskComponent, ColumnBTreeReadLeafFrame leafFrame,
+            ColumnRanges ranges, IBufferCache bufferCache) throws HyracksDataException {
+        ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+        long dpid = BufferedFileHandle.getDiskPageId(columnBTree.getFileId(), columnBTree.getBulkloadLeafStart());
+        ICachedPage page = bufferCache.pin(dpid);
+        try {
+            leafFrame.setPage(page);
+            ranges.reset(leafFrame);
+            for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
+                sizes[i] = Math.max(sizes[i], ranges.getColumnLength(i));
+                maxSize = Math.max(maxSize, sizes[i]);
+            }
+        } finally {
+            bufferCache.unpin(page);
+        }
+    }
+
+    private int findEvictableColumns(BitSet plan) {
+        int numberOfEvictableColumns = 0;
+        for (int i = 0; i < sizes.length; i++) {
+            if (!plan.get(i) && getWeight(i, indexedColumns, numberOfPrimaryKeys) >= punchableThreshold) {
+                // Column reached a punchable threshold; include it in the eviction plan.
+                plan.set(i);
+                numberOfEvictableColumns++;
+            }
+        }
+
+        return numberOfEvictableColumns;
+    }
+
+    private void resetPlanIfNeeded(boolean hasSpace) {
+        if (!hasSpace || numberOfCloudRequests < REEVALUATE_PLAN_THRESHOLD) {
+            return;
+        }
+
+        numberOfCloudRequests = 0;
+        reevaluatedPlan.clear();
+        int numberOfEvictableColumns = findEvictableColumns(reevaluatedPlan);
+        for (int i = 0; i < numberOfEvictableColumns; i++) {
+            int columnIndex = reevaluatedPlan.nextSetBit(i);
+            if (!plan.get(columnIndex)) {
+                // the plan contains a stale column. Invalidate!
+                plan.clear();
+                plan.or(reevaluatedPlan);
+                LOGGER.info("Re-planning to evict {} columns. The newly evictable columns are {}",
+                        numberOfEvictableColumns, plan);
+                break;
+            }
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
new file mode 100644
index 0000000..4932bf0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState.READABLE_UNWRITABLE;
+import static org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.sweeper.SweepContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.CriticalPath;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class ColumnSweeper {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ColumnSweepLockInfo lockedColumns;
+    private final ColumnRanges ranges;
+    private final List<ILSMDiskComponent> sweepableComponents;
+
+    public ColumnSweeper(int numberOfPrimaryKeys) {
+        lockedColumns = new ColumnSweepLockInfo();
+        ranges = new ColumnRanges(numberOfPrimaryKeys);
+        sweepableComponents = new ArrayList<>();
+    }
+
+    public long sweep(BitSet plan, SweepContext context, IColumnTupleProjector sweepProjector)
+            throws HyracksDataException {
+        IndexUnit indexUnit = context.getIndexUnit();
+        LSMColumnBTree lsmColumnBTree = (LSMColumnBTree) indexUnit.getIndex();
+        IColumnProjectionInfo projectionInfo = captureSweepableComponents(lsmColumnBTree, sweepProjector);
+        if (projectionInfo == null) {
+            // no sweepable components
+            return 0L;
+        }
+
+        LOGGER.info("Sweeping {}", lsmColumnBTree);
+        ILSMDiskComponent latestComponent = sweepableComponents.get(0);
+        ColumnBTreeReadLeafFrame leafFrame = ColumnSweeperUtil.createLeafFrame(projectionInfo, latestComponent);
+        IBufferCacheReadContext bcOpCtx = SweepBufferCacheReadContext.INSTANCE;
+        lockedColumns.reset(plan);
+        long freedSpace = 0L;
+        for (int i = 0; i < sweepableComponents.size(); i++) {
+            if (context.stopSweeping()) {
+                // Exit as the index is being dropped
+                return 0L;
+            }
+
+            boolean failed = false;
+            // Components are entered one at a time to allow components to be merged and deactivated
+            ILSMDiskComponent diskComponent = enterAndGetComponent(i, lsmColumnBTree);
+            // If diskComponent is null, that means it is not a viable candidate anymore
+            if (diskComponent != null) {
+                try {
+                    freedSpace += sweepDiskComponent(leafFrame, diskComponent, plan, context, bcOpCtx);
+                } catch (Throwable e) {
+                    failed = true;
+                    throw e;
+                } finally {
+                    exitComponent(diskComponent, lsmColumnBTree, failed);
+                }
+            }
+        }
+
+        LOGGER.info("Swept {} components and freed {} from disk", sweepableComponents.size(),
+                StorageUtil.toHumanReadableSize(freedSpace));
+        return freedSpace;
+    }
+
+    @CriticalPath
+    private IColumnProjectionInfo captureSweepableComponents(LSMColumnBTree lsmColumnBTree,
+            IColumnTupleProjector sweepProjector) throws HyracksDataException {
+        ILSMOperationTracker opTracker = lsmColumnBTree.getOperationTracker();
+        sweepableComponents.clear();
+        synchronized (opTracker) {
+            List<ILSMDiskComponent> diskComponents = lsmColumnBTree.getDiskComponents();
+            for (int i = 0; i < diskComponents.size(); i++) {
+                ILSMDiskComponent diskComponent = diskComponents.get(i);
+                /*
+                 * Get components that are only in READABLE_UNWRITABLE state. Components that are currently being
+                 * merged should not be swept as they will be deleted anyway.
+                 * Also, only sweep merged components as flushed components are relatively smaller and should be
+                 * merged eventually. So, it is preferable to read everything locally when merging flushed components.
+                 * TODO should we sweep flushed components?
+                 */
+                if (isMergedComponent(diskComponent.getId()) && diskComponent.getState() == READABLE_UNWRITABLE
+                        && diskComponent.getComponentSize() > 0) {
+                    // The component is a good candidate to be swept
+                    sweepableComponents.add(diskComponent);
+                }
+            }
+
+            if (sweepableComponents.isEmpty()) {
+                // No sweepable components
+                return null;
+            }
+
+            return ColumnSweeperUtil.createColumnProjectionInfo(sweepableComponents, sweepProjector);
+        }
+    }
+
+    private ILSMDiskComponent enterAndGetComponent(int index, LSMColumnBTree lsmColumnBTree)
+            throws HyracksDataException {
+        ILSMDiskComponent diskComponent = sweepableComponents.get(index);
+        synchronized (lsmColumnBTree.getOperationTracker()) {
+            // Make sure the component is still in READABLE_UNWRITABLE state
+            if (diskComponent.getState() == READABLE_UNWRITABLE
+                    && diskComponent.threadEnter(LSMOperationType.DISK_COMPONENT_SCAN, false)) {
+                return diskComponent;
+            }
+        }
+
+        // the component is not a viable candidate anymore
+        return null;
+    }
+
+    private void exitComponent(ILSMDiskComponent diskComponent, LSMColumnBTree lsmColumnBTree, boolean failed)
+            throws HyracksDataException {
+        synchronized (lsmColumnBTree.getOperationTracker()) {
+            diskComponent.threadExit(LSMOperationType.DISK_COMPONENT_SCAN, failed, false);
+        }
+    }
+
+    private long sweepDiskComponent(ColumnBTreeReadLeafFrame leafFrame, ILSMDiskComponent diskComponent, BitSet plan,
+            SweepContext context, IBufferCacheReadContext bcOpCtx) throws HyracksDataException {
+        long dpid = getFirstPageId(diskComponent);
+        int fileId = BufferedFileHandle.getFileId(dpid);
+        int nextPageId = BufferedFileHandle.getPageId(dpid);
+        int freedSpace = 0;
+        context.open(fileId);
+        try {
+            while (nextPageId >= 0) {
+                if (context.stopSweeping()) {
+                    // Exit as the index is being dropped
+                    return 0L;
+                }
+                CloudCachedPage page0 = context.pin(BufferedFileHandle.getDiskPageId(fileId, nextPageId), bcOpCtx);
+                boolean columnsLocked = false;
+                try {
+                    leafFrame.setPage(page0);
+                    nextPageId = leafFrame.getNextLeaf();
+                    columnsLocked = page0.trySweepLock(lockedColumns);
+                    if (columnsLocked) {
+                        leafFrame.setPage(page0);
+                        ranges.reset(leafFrame, plan);
+                        freedSpace += punchHoles(context, leafFrame);
+                    }
+                } finally {
+                    if (columnsLocked) {
+                        page0.sweepUnlock();
+                    }
+                    context.unpin(page0, bcOpCtx);
+                }
+            }
+        } finally {
+            context.close();
+        }
+
+        return freedSpace;
+    }
+
+    private long getFirstPageId(ILSMDiskComponent diskComponent) {
+        ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+        int fileId = columnBTree.getFileId();
+        int firstPage = columnBTree.getBulkloadLeafStart();
+        return BufferedFileHandle.getDiskPageId(fileId, firstPage);
+    }
+
+    private int punchHoles(SweepContext context, ColumnBTreeReadLeafFrame leafFrame) throws HyracksDataException {
+        int freedSpace = 0;
+        int numberOfPages = leafFrame.getMegaLeafNodeNumberOfPages();
+        int pageZeroId = leafFrame.getPageId();
+
+        // Start from 1 as we do not evict pageZero
+        BitSet nonEvictablePages = ranges.getNonEvictablePages();
+        int start = nonEvictablePages.nextClearBit(1);
+        while (start < numberOfPages) {
+            int end = nonEvictablePages.nextSetBit(start);
+            int numberOfEvictablePages = end - start;
+            freedSpace += context.punchHole(pageZeroId + start, numberOfEvictablePages);
+
+            start = nonEvictablePages.nextClearBit(end);
+        }
+
+        return freedSpace;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
new file mode 100644
index 0000000..e51f1d7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeLeafFrameFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+
+public class ColumnSweeperUtil {
+    private ColumnSweeperUtil() {
+    }
+
+    public static final BitSet EMPTY = new BitSet();
+
+    static IColumnProjectionInfo createColumnProjectionInfo(List<ILSMDiskComponent> diskComponents,
+            IColumnTupleProjector projector) throws HyracksDataException {
+        // Get the newest disk component, which has the newest column metadata
+        ILSMDiskComponent newestComponent = diskComponents.get(0);
+        IValueReference columnMetadata = ColumnUtil.getColumnMetadataCopy(newestComponent.getMetadata());
+
+        return projector.createProjectionInfo(columnMetadata);
+    }
+
+    static ColumnBTreeReadLeafFrame createLeafFrame(IColumnProjectionInfo projectionInfo,
+            ILSMDiskComponent diskComponent) {
+        ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+        ColumnBTreeLeafFrameFactory leafFrameFactory = (ColumnBTreeLeafFrameFactory) columnBTree.getLeafFrameFactory();
+        return leafFrameFactory.createReadFrame(projectionInfo);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
new file mode 100644
index 0000000..f36a51d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+@FunctionalInterface
+public interface ISweepClock {
+    long getCurrentTime();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
new file mode 100644
index 0000000..7b51d55
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+final class SweepBufferCacheReadContext implements IBufferCacheReadContext {
+    static final IBufferCacheReadContext INSTANCE = new SweepBufferCacheReadContext();
+
+    private SweepBufferCacheReadContext() {
+    }
+
+    @Override
+    public void onPin(ICachedPage page) {
+        // NoOp
+    }
+
+    @Override
+    public void onUnpin(ICachedPage page) {
+        // NoOp
+    }
+
+    @Override
+    public boolean isNewPage() {
+        return false;
+    }
+
+    @Override
+    public boolean incrementStats() {
+        // Do not increment the stats for the sweeper
+        return false;
+    }
+
+    @Override
+    public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage) throws HyracksDataException {
+        // Will not persist as the disk is pressured
+        return readAndPersistPage(ioManager, fileHandle, header, cPage, false);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
new file mode 100644
index 0000000..6bace98
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+import static org.apache.hyracks.util.StorageUtil.getIntSizeInBytes;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Random;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.dummy.DummyColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.dummy.DummySweepClock;
+import org.apache.hyracks.util.StorageUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
+public class ColumnSweepPlannerTest {
+    private static final int MAX_MEGA_LEAF_NODE_SIZE = getIntSizeInBytes(10, StorageUtil.StorageUnit.MEGABYTE);
+    private static final Random RANDOM = new Random(0);
+    private final DummySweepClock clock = new DummySweepClock();
+
+    @Test
+    public void test10Columns() {
+        int numberOfPrimaryKeys = 1;
+        int numberOfColumns = numberOfPrimaryKeys + 10;
+        int[] columnSizes = createNormalColumnSizes(numberOfPrimaryKeys, numberOfColumns);
+        ColumnSweepPlanner planner = new ColumnSweepPlanner(numberOfPrimaryKeys, clock);
+        IntList projectedColumns = new IntArrayList();
+        DummyColumnProjectionInfo info = new DummyColumnProjectionInfo(numberOfPrimaryKeys, QUERY, projectedColumns);
+
+        // Adjust sizes
+        planner.adjustColumnSizes(columnSizes, numberOfColumns);
+
+        // Project 3 columns
+        projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, projectedColumns);
+        // access the projected columns (max 10 times)
+        access(planner, info, true, 10);
+
+        // Advance clock
+        clock.advance(10);
+
+        // Plan for eviction
+        BitSet keptColumns = new BitSet();
+        planner.plan();
+        computeKeptColumns(planner.getPlanCopy(), keptColumns, numberOfColumns);
+
+        // Project another 3 columns
+        projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, projectedColumns);
+        // access the projected columns
+        access(planner, info, true, 100);
+
+        // At this point, the plan should change
+        BitSet newKeptColumns = new BitSet();
+        computeKeptColumns(planner.getPlanCopy(), newKeptColumns, numberOfColumns);
+
+        Assert.assertNotEquals(keptColumns, newKeptColumns);
+    }
+
+    private void computeKeptColumns(BitSet plan, BitSet keptColumns, int numberOfColumns) {
+        keptColumns.clear();
+        for (int i = 0; i < numberOfColumns; i++) {
+            if (!plan.get(i)) {
+                keptColumns.set(i);
+            }
+        }
+
+        System.out.println("Kept columns: " + keptColumns);
+    }
+
+    private void access(ColumnSweepPlanner planner, DummyColumnProjectionInfo info, boolean hasSpace, int bound) {
+        int numberOfAccesses = RANDOM.nextInt(1, bound);
+        for (int i = 0; i < numberOfAccesses; i++) {
+            planner.access(info, hasSpace);
+            clock.advance(1);
+        }
+
+        System.out.println("Accessed: " + info + " " + numberOfAccesses + " time");
+    }
+
+    private void projectedColumns(int numberPrimaryKeys, int numberOfColumns, int numberOfProjectedColumns,
+            IntList projectedColumns) {
+        IntSet alreadyProjectedColumns = new IntOpenHashSet();
+        projectedColumns.clear();
+        for (int i = 0; i < numberOfProjectedColumns; i++) {
+            int columnIndex = RANDOM.nextInt(numberPrimaryKeys, numberOfColumns);
+            while (alreadyProjectedColumns.contains(columnIndex)) {
+                columnIndex = RANDOM.nextInt(numberPrimaryKeys, numberOfColumns);
+            }
+            projectedColumns.add(columnIndex);
+            alreadyProjectedColumns.add(columnIndex);
+        }
+    }
+
+    private int[] createNormalColumnSizes(int numberOfPrimaryKeys, int numberOfColumns) {
+        int[] columnSizes = new int[numberOfColumns];
+        double[] normalDistribution = new double[numberOfColumns];
+        double sum = 0.0d;
+        for (int i = 0; i < numberOfColumns; i++) {
+            double value = Math.abs(RANDOM.nextGaussian());
+            normalDistribution[i] = value;
+            sum += value;
+        }
+
+        for (int i = numberOfPrimaryKeys; i < numberOfColumns; i++) {
+            int size = (int) Math.round((normalDistribution[i] / sum) * MAX_MEGA_LEAF_NODE_SIZE);
+            columnSizes[i] = size;
+        }
+
+        System.out.println("Column sizes:");
+        for (int i = 0; i < numberOfColumns; i++) {
+            System.out.println(i + ": " + StorageUtil.toHumanReadableSize(columnSizes[i]));
+        }
+        System.out.println("TotalSize:" + StorageUtil.toHumanReadableSize(Arrays.stream(columnSizes).sum()));
+        return columnSizes;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
new file mode 100644
index 0000000..3b9d6ba
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dummy;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+import it.unimi.dsi.fastutil.ints.IntList;
+
+public class DummyColumnProjectionInfo implements IColumnProjectionInfo {
+    private final int numberOfPrimaryKeys;
+    private final ColumnProjectorType projectorType;
+    private final IntList projectedColumns;
+
+    public DummyColumnProjectionInfo(int numberOfPrimaryKeys, ColumnProjectorType projectorType,
+            IntList projectedColumns) {
+        this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+        this.projectorType = projectorType;
+        this.projectedColumns = projectedColumns;
+    }
+
+    @Override
+    public int getColumnIndex(int ordinal) {
+        return projectedColumns.getInt(ordinal);
+    }
+
+    @Override
+    public int getNumberOfProjectedColumns() {
+        return projectedColumns.size();
+    }
+
+    @Override
+    public int getNumberOfPrimaryKeys() {
+        return numberOfPrimaryKeys;
+    }
+
+    @Override
+    public int getFilteredColumnIndex(int ordinal) {
+        return -1;
+    }
+
+    @Override
+    public int getNumberOfFilteredColumns() {
+        return 0;
+    }
+
+    @Override
+    public ColumnProjectorType getProjectorType() {
+        return projectorType;
+    }
+
+    @Override
+    public String toString() {
+        return projectedColumns.toString();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
new file mode 100644
index 0000000..6f209ec
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dummy;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ISweepClock;
+
+public class DummySweepClock implements ISweepClock {
+    long timestamp;
+
+    @Override
+    public long getCurrentTime() {
+        return timestamp;
+    }
+
+    public void advance(long delta) {
+        timestamp += delta;
+    }
+}