[ASTERIXDB-3389][STO] Support caching/eviciting columns

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Prepare columnar indexes to support caching and
evicting columns.

Change-Id: Ib557608b0b25219ffc00a76325091a92f5e77698
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18260
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
index 5ac38d7..b7c40bd 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
@@ -21,17 +21,27 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 
 public abstract class AbstractColumnImmutableReadMetadata extends AbstractColumnImmutableMetadata
         implements IColumnProjectionInfo {
+    private final ColumnProjectorType projectorType;
+
     protected AbstractColumnImmutableReadMetadata(ARecordType datasetType, ARecordType metaType,
-            int numberOfPrimaryKeys, IValueReference serializedMetadata, int numberOfColumns) {
+            int numberOfPrimaryKeys, IValueReference serializedMetadata, int numberOfColumns,
+            ColumnProjectorType projectorType) {
         super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, numberOfColumns);
+        this.projectorType = projectorType;
     }
 
     /**
      * @return the corresponding reader (merge reader or query reader) given <code>this</code> metadata
      */
     public abstract AbstractColumnTupleReader createTupleReader();
+
+    @Override
+    public final ColumnProjectorType getProjectorType() {
+        return projectorType;
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
index 4cbe09b..951a9fe 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
@@ -59,9 +59,8 @@
             columnMetadata.getMetaRoot().accept(this, null);
         }
 
-        int allocatedSpace = batchWriter.writePrimaryKeyColumns(primaryKeyWriters);
-        allocatedSpace += batchWriter.writeColumns(orderedColumns);
-        return allocatedSpace;
+        batchWriter.writePrimaryKeyColumns(primaryKeyWriters);
+        return batchWriter.writeColumns(orderedColumns);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
index f597e4f..c64b074 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
@@ -41,10 +41,10 @@
         if (flushColumnMetadata.getMetaType() == null) {
             //no meta
             return new FlushColumnTupleWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance,
-                    maxLeafNodeSize);
+                    maxLeafNodeSize, writeContext);
         }
         return new FlushColumnTupleWithMetaWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance,
-                maxLeafNodeSize);
+                maxLeafNodeSize, writeContext);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
index b51b395..a1abf5e 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
@@ -21,6 +21,7 @@
 import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
 import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
 import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
 
 public class FlushColumnTupleWithMetaWriter extends FlushColumnTupleWriter {
@@ -28,8 +29,8 @@
     private final RecordLazyVisitablePointable metaPointable;
 
     public FlushColumnTupleWithMetaWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
-            double tolerance, int maxLeafNodeSize) {
-        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize);
+            double tolerance, int maxLeafNodeSize, IColumnWriteContext writeContext) {
+        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize, writeContext);
         metaColumnTransformer = new ColumnTransformer(columnMetadata, columnMetadata.getMetaRoot());
         metaPointable = new TypedRecordLazyVisitablePointable(columnMetadata.getMetaType());
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index 41cad49..65f5eb4 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
 import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
 
 public class FlushColumnTupleWriter extends AbstractColumnTupleWriter {
@@ -45,11 +46,11 @@
     protected int primaryKeysEstimatedSize;
 
     public FlushColumnTupleWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
-            double tolerance, int maxLeafNodeSize) {
+            double tolerance, int maxLeafNodeSize, IColumnWriteContext writeContext) {
         this.columnMetadata = columnMetadata;
         transformer = new ColumnTransformer(columnMetadata, columnMetadata.getRoot());
         finalizer = new BatchFinalizerVisitor(columnMetadata);
-        writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance);
+        writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance, writeContext);
         this.maxNumberOfTuples = maxNumberOfTuples;
         this.maxLeafNodeSize = maxLeafNodeSize;
         pointable = new TypedRecordLazyVisitablePointable(columnMetadata.getDatasetType());
@@ -105,6 +106,7 @@
     @Override
     public final void close() {
         columnMetadata.close();
+        writer.close();
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
index 85569f9..531502e 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
@@ -40,6 +40,6 @@
     public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata,
             IColumnWriteContext writeContext) {
         return new LoadColumnTupleWriter((FlushColumnMetadata) columnMetadata, pageSize, maxNumberOfTuples, tolerance,
-                maxLeafNodeSize, MultiComparator.create(cmpFactories));
+                maxLeafNodeSize, MultiComparator.create(cmpFactories), writeContext);
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
index ca14000..0ba49cf 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 public class LoadColumnTupleWriter extends FlushColumnTupleWriter {
@@ -32,8 +33,8 @@
     private final MultiComparator comparator;
 
     public LoadColumnTupleWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
-            double tolerance, int maxLeafNodeSize, MultiComparator comparator) {
-        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize);
+            double tolerance, int maxLeafNodeSize, MultiComparator comparator, IColumnWriteContext writeContext) {
+        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize, writeContext);
         prevTupleKeys =
                 PointableTupleReference.create(columnMetadata.getNumberOfPrimaryKeys(), ArrayBackedValueStorage::new);
         this.comparator = comparator;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
index a1eff69..0152f5a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.lsm.merge;
 
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -43,7 +45,7 @@
 
     private MergeColumnReadMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys,
             IColumnValuesReader[] columnReaders, IValueReference serializedMetadata) {
-        super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, columnReaders.length);
+        super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, columnReaders.length, MERGE);
         this.columnReaders = columnReaders;
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
index d792855..3f98c5f 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
@@ -38,7 +38,8 @@
     public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata,
             IColumnWriteContext writeContext) {
         MergeColumnWriteMetadata mergeWriteMetadata = (MergeColumnWriteMetadata) columnMetadata;
-        return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize);
+        return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize,
+                writeContext);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index d3c102a..5912a3b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
 import org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -52,7 +53,7 @@
     private int numberOfAntiMatter;
 
     public MergeColumnTupleWriter(MergeColumnWriteMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
-            double tolerance, int maxLeafNodeSize) {
+            double tolerance, int maxLeafNodeSize, IColumnWriteContext writeContext) {
         this.columnMetadata = columnMetadata;
         this.maxLeafNodeSize = maxLeafNodeSize;
         List<IColumnTupleIterator> componentsTuplesList = columnMetadata.getComponentsTuples();
@@ -68,7 +69,7 @@
         }
         this.maxNumberOfTuples = getMaxNumberOfTuples(maxNumberOfTuples, totalNumberOfTuples, totalLength);
         this.writtenComponents = new RunLengthIntArray();
-        writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance);
+        writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance, writeContext);
         writtenComponents.reset();
         primaryKeyWriters = new IColumnValuesWriter[columnMetadata.getNumberOfPrimaryKeys()];
         for (int i = 0; i < primaryKeyWriters.length; i++) {
@@ -142,16 +143,17 @@
             orderedColumns.add(columnMetadata.getWriter(i));
         }
         writer.setPageZeroBuffer(pageZero, numberOfColumns, numberOfPrimaryKeys);
-        int allocatedSpace = writer.writePrimaryKeyColumns(primaryKeyWriters);
-        allocatedSpace += writer.writeColumns(orderedColumns);
+        writer.writePrimaryKeyColumns(primaryKeyWriters);
+        int totalLength = writer.writeColumns(orderedColumns);
 
         numberOfAntiMatter = 0;
-        return allocatedSpace;
+        return totalLength;
     }
 
     @Override
     public void close() {
         columnMetadata.close();
+        writer.close();
     }
 
     private void writePrimaryKeys(MergeColumnTupleReference columnTuple) throws HyracksDataException {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java
index c86318e..57ad0ac 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.lsm.secondary.create;
 
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
+
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collections;
@@ -40,7 +42,7 @@
             ARecordType requestedType) {
         projector = new QueryColumnTupleProjector(datasetType, numberOfPrimaryKeys, requestedType,
                 Collections.emptyMap(), NoOpColumnFilterEvaluatorFactory.INSTANCE,
-                NoOpColumnFilterEvaluatorFactory.INSTANCE, NoOpWarningCollector.INSTANCE, null);
+                NoOpColumnFilterEvaluatorFactory.INSTANCE, NoOpWarningCollector.INSTANCE, null, MODIFY);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java
index 34b0291..2b95c1a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.lsm.secondary.upsert;
 
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
+
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collections;
@@ -42,7 +44,7 @@
         builder = new ArrayTupleBuilder(numberOfPrimaryKeys + 1);
         projector = new QueryColumnTupleProjector(datasetType, numberOfPrimaryKeys, requestedType,
                 Collections.emptyMap(), NoOpColumnFilterEvaluatorFactory.INSTANCE,
-                NoOpColumnFilterEvaluatorFactory.INSTANCE, NoOpWarningCollector.INSTANCE, null);
+                NoOpColumnFilterEvaluatorFactory.INSTANCE, NoOpWarningCollector.INSTANCE, null, MODIFY);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
index d50af38..a697e0d 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
@@ -55,6 +55,7 @@
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.apache.hyracks.util.LogRedactionUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -78,9 +79,9 @@
             FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, IColumnValuesReaderFactory readerFactory,
             IValueGetterFactory valueGetterFactory, IColumnFilterEvaluator normalizedFilterEvaluator,
             List<IColumnRangeFilterValueAccessor> filterValueAccessors,
-            IColumnIterableFilterEvaluator columnFilterEvaluator, List<IColumnValuesReader> filterColumnReaders)
-            throws HyracksDataException {
-        super(datasetType, metaType, primaryKeyReaders.length, serializedMetadata, -1);
+            IColumnIterableFilterEvaluator columnFilterEvaluator, List<IColumnValuesReader> filterColumnReaders,
+            ColumnProjectorType projectorType) throws HyracksDataException {
+        super(datasetType, metaType, primaryKeyReaders.length, serializedMetadata, -1, projectorType);
         this.fieldNamesDictionary = fieldNamesDictionary;
         this.primaryKeyReaders = primaryKeyReaders;
         this.normalizedFilterEvaluator = normalizedFilterEvaluator;
@@ -175,7 +176,7 @@
             Map<String, FunctionCallInformation> functionCallInfoMap,
             IColumnRangeFilterEvaluatorFactory normalizedEvaluatorFactory,
             IColumnIterableFilterEvaluatorFactory columnFilterEvaluatorFactory, IWarningCollector warningCollector,
-            IHyracksTaskContext context) throws IOException {
+            IHyracksTaskContext context, ColumnProjectorType projectorType) throws IOException {
         byte[] bytes = serializedMetadata.getByteArray();
         int offset = serializedMetadata.getStartOffset();
         int length = serializedMetadata.getLength();
@@ -230,7 +231,7 @@
 
         return new QueryColumnMetadata(datasetType, null, primaryKeyReaders, serializedMetadata, fieldNamesDictionary,
                 clippedRoot, readerFactory, valueGetterFactory, normalizedFilterEvaluator, filterValueAccessors,
-                columnFilterEvaluator, filterColumnReaders);
+                columnFilterEvaluator, filterColumnReaders, projectorType);
     }
 
     protected static ObjectSchemaNode clip(ARecordType requestedType, ObjectSchemaNode root,
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java
index 369a891..d17cfbd 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
 
@@ -47,6 +48,7 @@
     protected final Map<String, FunctionCallInformation> functionCallInfoMap;
     protected final IWarningCollector warningCollector;
     protected final IHyracksTaskContext context;
+    protected final ColumnProjectorType projectorType;
     protected final IColumnRangeFilterEvaluatorFactory normalizedFilterEvaluatorFactory;
     protected final IColumnIterableFilterEvaluatorFactory columnFilterEvaluatorFactory;
     private final AssembledTupleReference assembledTupleReference;
@@ -55,7 +57,7 @@
             Map<String, FunctionCallInformation> functionCallInfoMap,
             IColumnRangeFilterEvaluatorFactory normalizedFilterEvaluatorFactory,
             IColumnIterableFilterEvaluatorFactory columnFilterEvaluatorFactory, IWarningCollector warningCollector,
-            IHyracksTaskContext context) {
+            IHyracksTaskContext context, ColumnProjectorType projectorType) {
         this.datasetType = datasetType;
         this.numberOfPrimaryKeys = numberOfPrimaryKeys;
         this.requestedType = requestedType;
@@ -64,6 +66,7 @@
         this.columnFilterEvaluatorFactory = columnFilterEvaluatorFactory;
         this.warningCollector = warningCollector;
         this.context = context;
+        this.projectorType = projectorType;
         assembledTupleReference = new AssembledTupleReference(getNumberOfTupleFields());
     }
 
@@ -72,7 +75,8 @@
         try {
             return QueryColumnMetadata.create(datasetType, numberOfPrimaryKeys, serializedMetadata,
                     new ColumnValueReaderFactory(), ValueGetterFactory.INSTANCE, requestedType, functionCallInfoMap,
-                    normalizedFilterEvaluatorFactory, columnFilterEvaluatorFactory, warningCollector, context);
+                    normalizedFilterEvaluatorFactory, columnFilterEvaluatorFactory, warningCollector, context,
+                    projectorType);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java
index 0265a05..e333405 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.query;
 
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+
 import java.util.Map;
 
 import org.apache.asterix.column.filter.iterable.IColumnIterableFilterEvaluatorFactory;
@@ -61,7 +63,7 @@
         if (requestedMetaType == null) {
             // The dataset does not contain a meta part
             return new QueryColumnTupleProjector(datasetType, numberOfPrimaryKeys, requestedType, functionCallInfo,
-                    rangeFilterEvaluatorFactory, columnFilterEvaluatorFactory, warningCollector, context);
+                    rangeFilterEvaluatorFactory, columnFilterEvaluatorFactory, warningCollector, context, QUERY);
         }
         // The dataset has a meta part
         return new QueryColumnWithMetaTupleProjector(datasetType, metaType, numberOfPrimaryKeys, requestedType,
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
index 34e40ff..8b23c05 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
@@ -51,6 +51,7 @@
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 
 /**
  * Query column metadata (with metaRecord)
@@ -63,10 +64,11 @@
             FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot,
             IColumnValuesReaderFactory readerFactory, IValueGetterFactory valueGetterFactory,
             IColumnFilterEvaluator filterEvaluator, List<IColumnRangeFilterValueAccessor> filterValueAccessors,
-            IColumnIterableFilterEvaluator columnFilterEvaluator, List<IColumnValuesReader> filterColumnReaders)
-            throws HyracksDataException {
+            IColumnIterableFilterEvaluator columnFilterEvaluator, List<IColumnValuesReader> filterColumnReaders,
+            ColumnProjectorType projectorType) throws HyracksDataException {
         super(datasetType, metaType, primaryKeyReaders, serializedMetadata, fieldNamesDictionary, root, readerFactory,
-                valueGetterFactory, filterEvaluator, filterValueAccessors, columnFilterEvaluator, filterColumnReaders);
+                valueGetterFactory, filterEvaluator, filterValueAccessors, columnFilterEvaluator, filterColumnReaders,
+                projectorType);
         metaAssembler = new ColumnAssembler(metaRoot, metaType, this, readerFactory, valueGetterFactory);
     }
 
@@ -121,7 +123,7 @@
             Map<String, FunctionCallInformation> functionCallInfo, ARecordType metaRequestedType,
             IColumnRangeFilterEvaluatorFactory normalizedEvaluatorFactory,
             IColumnIterableFilterEvaluatorFactory columnFilterEvaluatorFactory, IWarningCollector warningCollector,
-            IHyracksTaskContext context) throws IOException {
+            IHyracksTaskContext context, ColumnProjectorType projectorType) throws IOException {
         byte[] bytes = serializedMetadata.getByteArray();
         int offset = serializedMetadata.getStartOffset();
         int length = serializedMetadata.getLength();
@@ -179,6 +181,7 @@
 
         return new QueryColumnWithMetaMetadata(datasetType, metaType, primaryKeyReaders, serializedMetadata,
                 fieldNamesDictionary, clippedRoot, metaClippedRoot, readerFactory, valueGetterFactory,
-                normalizedFilterEvaluator, filterValueAccessors, columnFilterEvaluator, filterColumnReaders);
+                normalizedFilterEvaluator, filterValueAccessors, columnFilterEvaluator, filterColumnReaders,
+                projectorType);
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java
index f31c7cc..74524fe 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.query;
 
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Map;
@@ -47,7 +49,7 @@
             IColumnIterableFilterEvaluatorFactory columnFilterEvaluatorFactory, IWarningCollector warningCollector,
             IHyracksTaskContext context) {
         super(datasetType, numberOfPrimaryKeys, requestedType, functionCallInfoMap, filterEvaluator,
-                columnFilterEvaluatorFactory, warningCollector, context);
+                columnFilterEvaluatorFactory, warningCollector, context, QUERY);
         this.metaType = metaType;
         this.requestedMetaType = requestedMetaType;
     }
@@ -58,7 +60,7 @@
             return QueryColumnWithMetaMetadata.create(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata,
                     new ColumnValueReaderFactory(), ValueGetterFactory.INSTANCE, requestedType, functionCallInfoMap,
                     requestedMetaType, normalizedFilterEvaluatorFactory, columnFilterEvaluatorFactory, warningCollector,
-                    context);
+                    context, projectorType);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
index fc1173f..063743d 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
@@ -30,15 +30,19 @@
      * Writes the primary keys' values to Page0
      *
      * @param primaryKeyWriters primary keys' writers
-     * @return the allocated space for the primary keys' writers
      */
-    int writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) throws HyracksDataException;
+    void writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) throws HyracksDataException;
 
     /**
      * Writes the non-key values to multiple pages
      *
      * @param nonKeysColumnWriters non-key values' writers
-     * @return the allocated space for the non-key values' writers
+     * @return length of all columns (that includes pageZero)
      */
     int writeColumns(PriorityQueue<IColumnValuesWriter> nonKeysColumnWriters) throws HyracksDataException;
+
+    /**
+     * Close the writer
+     */
+    void close();
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
index 6fbdc27..1d8f684 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
@@ -31,6 +31,7 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
 
 /**
  * A writer for a batch columns' values
@@ -40,17 +41,19 @@
     private final MultiPersistentBufferBytesOutputStream columns;
     private final int pageSize;
     private final double tolerance;
+    private final IColumnWriteContext writeContext;
     private final IReservedPointer columnLengthPointer;
-
     private ByteBuffer pageZero;
     private int columnsOffset;
     private int filtersOffset;
     private int primaryKeysOffset;
     private int nonKeyColumnStartOffset;
 
-    public ColumnBatchWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int pageSize, double tolerance) {
+    public ColumnBatchWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int pageSize, double tolerance,
+            IColumnWriteContext writeContext) {
         this.pageSize = pageSize;
         this.tolerance = tolerance;
+        this.writeContext = writeContext;
         primaryKeys = new ByteBufferOutputStream();
         columns = new MultiPersistentBufferBytesOutputStream(multiPageOpRef);
         columnLengthPointer = columns.createPointer();
@@ -74,39 +77,48 @@
     }
 
     @Override
-    public int writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) throws HyracksDataException {
-        int allocatedSpace = 0;
+    public void writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) throws HyracksDataException {
         for (int i = 0; i < primaryKeyWriters.length; i++) {
             IColumnValuesWriter writer = primaryKeyWriters[i];
             setColumnOffset(i, primaryKeysOffset + primaryKeys.size());
             writer.flush(primaryKeys);
-            allocatedSpace += writer.getAllocatedSpace();
         }
-        return allocatedSpace;
     }
 
     @Override
     public int writeColumns(PriorityQueue<IColumnValuesWriter> nonKeysColumnWriters) throws HyracksDataException {
-        int allocatedSpace = 0;
         columns.reset();
         while (!nonKeysColumnWriters.isEmpty()) {
             IColumnValuesWriter writer = nonKeysColumnWriters.poll();
             writeColumn(writer);
-            allocatedSpace += writer.getAllocatedSpace();
         }
-        return allocatedSpace;
+
+        // compute the final length
+        int totalLength = nonKeyColumnStartOffset + columns.size();
+        // reset to ensure the last buffer's position and limit are set appropriately
+        columns.reset();
+        return totalLength;
+    }
+
+    @Override
+    public void close() {
+        writeContext.close();
     }
 
     private void writeColumn(IColumnValuesWriter writer) throws HyracksDataException {
+        boolean overlapping = true;
         if (!hasEnoughSpace(columns.getCurrentBufferPosition(), writer)) {
             /*
              * We reset the columns stream to write all pages and confiscate a new buffer to minimize splitting
              * the columns value into multiple pages.
              */
+            overlapping = false;
             nonKeyColumnStartOffset += columns.capacity();
             columns.reset();
         }
 
+        int columnIndex = writer.getColumnIndex();
+        writeContext.startWritingColumn(columnIndex, overlapping);
         int columnRelativeOffset = columns.size();
         columns.reserveInteger(columnLengthPointer);
         setColumnOffset(writer.getColumnIndex(), nonKeyColumnStartOffset + columnRelativeOffset);
@@ -116,10 +128,19 @@
 
         int length = columns.size() - columnRelativeOffset;
         columnLengthPointer.setInteger(length);
+        writeContext.endWritingColumn(columnIndex, length);
     }
 
     private boolean hasEnoughSpace(int bufferPosition, IColumnValuesWriter columnWriter) {
-        //Estimated size mostly overestimate the size
+        if (bufferPosition == 0) {
+            // if the current buffer is empty, then use it
+            return true;
+        } else if (tolerance == 1.0d) {
+            // if tolerance is 100%, then it should avoid doing any calculations and start a with a new page
+            return false;
+        }
+
+        // Estimated size mostly overestimate the size
         int columnSize = columnWriter.getEstimatedSize();
         float remainingPercentage = (pageSize - bufferPosition) / (float) pageSize;
         if (columnSize > pageSize) {
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index c4f8727..55097c5 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -62,6 +62,7 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write.DefaultColumnWriteContext;
 import org.apache.hyracks.util.StorageUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -151,7 +152,7 @@
             int numberOfTuplesToWrite) throws IOException {
         IColumnWriteMultiPageOp multiPageOp = columnMetadata.getMultiPageOpRef().getValue();
         FlushColumnTupleWriter writer = new FlushColumnTupleWriter(columnMetadata, PAGE_SIZE, MAX_NUMBER_OF_TUPLES,
-                TOLERANCE, MAX_LEAF_NODE_SIZE);
+                TOLERANCE, MAX_LEAF_NODE_SIZE, DefaultColumnWriteContext.INSTANCE);
 
         try {
             return writeTuples(fileId, writer, records, numberOfTuplesToWrite, multiPageOp);
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java
index 6a0256c..296a09f 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -63,12 +64,12 @@
         FlushColumnMetadata columnMetadata = prepareNewFile(fileId);
         List<IValueReference> record = getParsedRecords();
         List<DummyPage> pageZeros = transform(fileId, columnMetadata, record, numberOfTuplesToWrite);
-        QueryColumnMetadata readMetadata =
-                QueryColumnMetadata.create(columnMetadata.getDatasetType(), columnMetadata.getNumberOfPrimaryKeys(),
-                        columnMetadata.serializeColumnsMetadata(), new ColumnValueReaderFactory(),
-                        ValueGetterFactory.INSTANCE, ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE,
-                        Collections.emptyMap(), NoOpColumnFilterEvaluatorFactory.INSTANCE,
-                        NoOpColumnFilterEvaluatorFactory.INSTANCE, NoOpWarningCollector.INSTANCE, null);
+        QueryColumnMetadata readMetadata = QueryColumnMetadata.create(columnMetadata.getDatasetType(),
+                columnMetadata.getNumberOfPrimaryKeys(), columnMetadata.serializeColumnsMetadata(),
+                new ColumnValueReaderFactory(), ValueGetterFactory.INSTANCE,
+                ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE, Collections.emptyMap(),
+                NoOpColumnFilterEvaluatorFactory.INSTANCE, NoOpColumnFilterEvaluatorFactory.INSTANCE,
+                NoOpWarningCollector.INSTANCE, null, ColumnProjectorType.QUERY);
         writeResult(fileId, readMetadata, pageZeros);
         testCase.compareRepeated(numberOfTuplesToWrite);
     }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java
index 8b45142..463b89f 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -62,12 +63,12 @@
         FlushColumnMetadata columnMetadata = prepareNewFile(fileId);
         List<IValueReference> record = getParsedRecords();
         List<DummyPage> pageZeros = transform(fileId, columnMetadata, record, record.size());
-        QueryColumnMetadata readMetadata =
-                QueryColumnMetadata.create(columnMetadata.getDatasetType(), columnMetadata.getNumberOfPrimaryKeys(),
-                        columnMetadata.serializeColumnsMetadata(), new ColumnValueReaderFactory(),
-                        ValueGetterFactory.INSTANCE, ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE,
-                        Collections.emptyMap(), NoOpColumnFilterEvaluatorFactory.INSTANCE,
-                        NoOpColumnFilterEvaluatorFactory.INSTANCE, NoOpWarningCollector.INSTANCE, null);
+        QueryColumnMetadata readMetadata = QueryColumnMetadata.create(columnMetadata.getDatasetType(),
+                columnMetadata.getNumberOfPrimaryKeys(), columnMetadata.serializeColumnsMetadata(),
+                new ColumnValueReaderFactory(), ValueGetterFactory.INSTANCE,
+                ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE, Collections.emptyMap(),
+                NoOpColumnFilterEvaluatorFactory.INSTANCE, NoOpColumnFilterEvaluatorFactory.INSTANCE,
+                NoOpWarningCollector.INSTANCE, null, ColumnProjectorType.QUERY);
         writeResult(fileId, readMetadata, pageZeros);
         testCase.compare();
     }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
index 80106dc..ab06484 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
@@ -49,6 +49,7 @@
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -99,7 +100,7 @@
                 columnMetadata.getNumberOfPrimaryKeys(), columnMetadata.serializeColumnsMetadata(), readerFactory,
                 DummyValueGetterFactory.INSTANCE, ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE, Collections.emptyMap(),
                 NoOpColumnFilterEvaluatorFactory.INSTANCE, NoOpColumnFilterEvaluatorFactory.INSTANCE,
-                NoOpWarningCollector.INSTANCE, null);
+                NoOpWarningCollector.INSTANCE, null, ColumnProjectorType.QUERY);
         AbstractBytesInputStream[] streams = new AbstractBytesInputStream[columnMetadata.getNumberOfColumns()];
         Arrays.fill(streams, DummyBytesInputStream.INSTANCE);
 
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
index eab824aa..234f804 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
@@ -33,16 +33,21 @@
 
     @Override
     public void setPageZeroBuffer(ByteBuffer pageZeroBuffer, int numberOfColumns, int numberOfPrimaryKeys) {
-
+        // NoOp
     }
 
     @Override
-    public int writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) throws HyracksDataException {
-        return 0;
+    public void writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) throws HyracksDataException {
+        // NoOp
     }
 
     @Override
     public int writeColumns(PriorityQueue<IColumnValuesWriter> nonKeysColumnWriters) throws HyracksDataException {
         return 0;
     }
+
+    @Override
+    public void close() {
+        // NoOp
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index 43aa873..bbd8d87 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -93,6 +93,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-cloud</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
@@ -104,5 +109,10 @@
       <groupId>it.unimi.dsi</groupId>
       <artifactId>fastutil-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
new file mode 100644
index 0000000..29258b5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.api.projection;
+
+public enum ColumnProjectorType {
+    MERGE,
+    QUERY,
+    MODIFY
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
index 641f704..b1bfe87 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
@@ -48,4 +48,9 @@
      * @return number of filtered columns
      */
     int getNumberOfFilteredColumns();
+
+    /**
+     * @return the type of {@link IColumnTupleProjector} that created this projection info
+     */
+    ColumnProjectorType getProjectorType();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
new file mode 100644
index 0000000..38a1713
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.sweeper.SweepContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.DefaultColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write.CloudColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+
+/**
+ * The disk manager cannot be shared among different partitions as columns are local to each partition.
+ * For example, column 9 in partition 0 corresponds to "salary" while column 9 in partition 1 corresponds to "age".
+ */
+public final class CloudColumnIndexDiskCacheManager implements IColumnIndexDiskCacheManager {
+    private final IColumnTupleProjector sweepProjector;
+    private final IPhysicalDrive drive;
+    private final ColumnSweepPlanner planner;
+    private final ColumnSweeper sweeper;
+
+    public CloudColumnIndexDiskCacheManager(int numberOfPrimaryKeys, IColumnTupleProjector sweepProjector,
+            IPhysicalDrive drive) {
+        this.sweepProjector = sweepProjector;
+        this.drive = drive;
+        planner = new ColumnSweepPlanner(numberOfPrimaryKeys, System::nanoTime);
+        sweeper = new ColumnSweeper(numberOfPrimaryKeys);
+    }
+
+    @Override
+    public void activate(int numberOfColumns, List<ILSMDiskComponent> diskComponents, IBufferCache bufferCache)
+            throws HyracksDataException {
+        planner.onActivate(numberOfColumns, diskComponents, sweepProjector, bufferCache);
+    }
+
+    @Override
+    public IColumnWriteContext createWriteContext(int numberOfColumns, LSMIOOperationType operationType) {
+        return new CloudColumnWriteContext(drive, planner, numberOfColumns);
+    }
+
+    @Override
+    public IColumnReadContext createReadContext(IColumnProjectionInfo projectionInfo) {
+        ColumnProjectorType projectorType = projectionInfo.getProjectorType();
+        if (projectorType == ColumnProjectorType.QUERY) {
+            planner.access(projectionInfo, drive.hasSpace());
+        } else if (projectorType == ColumnProjectorType.MODIFY) {
+            planner.setIndexedColumns(projectionInfo);
+            // Requested (and indexed) columns will be persisted if space permits
+            return DefaultColumnReadContext.INSTANCE;
+        }
+        return new CloudColumnReadContext(projectionInfo, drive, planner.getPlanCopy());
+    }
+
+    @Override
+    public boolean isActive() {
+        return planner.isActive();
+    }
+
+    @Override
+    public boolean isSweepable() {
+        return true;
+    }
+
+    @Override
+    public boolean prepareSweepPlan() {
+        return planner.plan();
+    }
+
+    @Override
+    public long sweep(ISweepContext context) throws HyracksDataException {
+        SweepContext sweepContext = (SweepContext) context;
+        return sweeper.sweep(planner.getPlanCopy(), sweepContext, sweepProjector);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
new file mode 100644
index 0000000..9fc60fa
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeperUtil.EMPTY;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnPageIndex;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.longs.LongArrays;
+import it.unimi.dsi.fastutil.longs.LongComparator;
+
+/**
+ * Computes columns offsets, lengths, and pages
+ */
+public final class ColumnRanges {
+    private static final LongComparator OFFSET_COMPARATOR =
+            (x, y) -> Integer.compare(getOffsetFromPair(x), getOffsetFromPair(y));
+    private final int numberOfPrimaryKeys;
+
+    // For eviction
+    private final BitSet nonEvictablePages;
+
+    // For Query
+    private final BitSet evictablePages;
+    private final BitSet cloudOnlyPages;
+
+    private ColumnBTreeReadLeafFrame leafFrame;
+    private long[] offsetColumnIndexPairs;
+    private int[] lengths;
+    private int[] columnsOrder;
+    private int pageZeroId;
+
+    public ColumnRanges(int numberOfPrimaryKeys) {
+        this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+
+        offsetColumnIndexPairs = new long[0];
+        lengths = new int[0];
+        columnsOrder = new int[0];
+
+        nonEvictablePages = new BitSet();
+
+        evictablePages = new BitSet();
+        cloudOnlyPages = new BitSet();
+    }
+
+    /**
+     * @return number of primary keys
+     */
+    public int getNumberOfPrimaryKeys() {
+        return numberOfPrimaryKeys;
+    }
+
+    /**
+     * Reset ranges for initializing {@link ColumnSweepPlanner}
+     *
+     * @param leafFrame to compute the ranges for
+     */
+    public void reset(ColumnBTreeReadLeafFrame leafFrame) {
+        reset(leafFrame, EMPTY, EMPTY, EMPTY);
+    }
+
+    /**
+     * Reset column ranges for {@link ColumnSweeper}
+     *
+     * @param leafFrame to compute the ranges for
+     * @param plan      eviction plan
+     */
+    public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet plan) {
+        reset(leafFrame, plan, EMPTY, EMPTY);
+    }
+
+    /**
+     * Reset ranges for {@link CloudColumnReadContext}
+     *
+     * @param leafFrame        to compute the ranges for
+     * @param requestedColumns required columns
+     * @param evictableColumns columns that are or will be evicted
+     * @param cloudOnlyColumns locked columns that cannot be read from a local disk
+     */
+    public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet requestedColumns, BitSet evictableColumns,
+            BitSet cloudOnlyColumns) {
+        // Set leafFrame
+        this.leafFrame = leafFrame;
+        // Ensure arrays capacities (given the leafFrame's columns and pages)
+        init();
+
+        // Get the number of columns in a page
+        int numberOfColumns = leafFrame.getNumberOfColumns();
+        for (int i = 0; i < numberOfColumns; i++) {
+            long offset = leafFrame.getColumnOffset(i);
+            // Set the first 32-bits to the offset and the second 32-bits to columnIndex
+            offsetColumnIndexPairs[i] = (offset << 32) + i;
+        }
+
+        // Set artificial offset to determine the last column's length
+        offsetColumnIndexPairs[numberOfColumns] = (leafFrame.getMegaLeafNodeLengthInBytes() << 32) + numberOfColumns;
+
+        // Sort the pairs by offset (i.e., lowest offset first)
+        LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfColumns, OFFSET_COMPARATOR);
+
+        int columnOrdinal = 0;
+        for (int i = 0; i < numberOfColumns; i++) {
+            int columnIndex = getColumnIndexFromPair(offsetColumnIndexPairs[i]);
+            int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
+            int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
+
+            // Compute the column's length in bytes (set 0 for PKs)
+            int length = columnIndex < numberOfPrimaryKeys ? 0 : nextOffset - offset;
+            lengths[columnIndex] = length;
+
+            // Get start page ID (given the computed length above)
+            int startPageId = getColumnStartPageIndex(columnIndex);
+            // Get the number of pages (given the computed length above)
+            int numberOfPages = getColumnNumberOfPages(columnIndex);
+
+            if (columnIndex >= numberOfPrimaryKeys && requestedColumns.get(columnIndex)) {
+                // Set column index
+                columnsOrder[columnOrdinal++] = columnIndex;
+                // Compute cloud-only and evictable pages
+                setCloudOnlyAndEvictablePages(columnIndex, cloudOnlyColumns, evictableColumns, startPageId,
+                        numberOfPages);
+                // A requested column. Keep its pages as requested
+                continue;
+            }
+
+            // Mark the page as non-evictable
+            for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+                nonEvictablePages.set(j);
+            }
+        }
+
+        // Bound the nonRequestedPages to the number of pages in the mega leaf node
+        nonEvictablePages.set(leafFrame.getMegaLeafNodeNumberOfPages());
+        // to indicate the end
+        columnsOrder[columnOrdinal] = -1;
+    }
+
+    /**
+     * First page of a column
+     *
+     * @param columnIndex column index
+     * @return pageID
+     */
+    public int getColumnStartPageIndex(int columnIndex) {
+        int pageSize = leafFrame.getBuffer().capacity();
+        return getColumnPageIndex(leafFrame.getColumnOffset(columnIndex), pageSize);
+    }
+
+    /**
+     * Length of a column in pages
+     *
+     * @param columnIndex column index
+     * @return number of pages
+     */
+    public int getColumnNumberOfPages(int columnIndex) {
+        int pageSize = leafFrame.getBuffer().capacity();
+        int numberOfPages = getNumberOfPages(getColumnLength(columnIndex), pageSize);
+        return numberOfPages == 0 ? 1 : numberOfPages;
+    }
+
+    /**
+     * Length of a column in bytes
+     *
+     * @param columnIndex column index
+     * @return number of bytes
+     */
+    public int getColumnLength(int columnIndex) {
+        return lengths[columnIndex];
+    }
+
+    /**
+     * Returns true if the page is meant to be read from the cloud only
+     *
+     * @param pageId page ID
+     * @return true of the page should be read from the cloud, false otherwise
+     * @see #reset(ColumnBTreeReadLeafFrame, BitSet, BitSet, BitSet)
+     */
+    public boolean isCloudOnly(int pageId) {
+        // Compute the relative page ID for this mega leaf node
+        int relativePageId = pageId - pageZeroId;
+        return cloudOnlyPages.get(relativePageId);
+    }
+
+    /**
+     * Whether the page has been or will be evicted
+     *
+     * @param pageId page ID
+     * @return true of the page was or will be evicted, false otherwise
+     */
+    public boolean isEvictable(int pageId) {
+        int relativePageId = pageId - pageZeroId;
+        return evictablePages.get(relativePageId);
+    }
+
+    /**
+     * @return Bitset of all non-requested pages
+     */
+    public BitSet getNonEvictablePages() {
+        return nonEvictablePages;
+    }
+
+    /**
+     * @return you the order of columns that should be read in order to ensure (semi) sequential access.
+     * Sequential means page X is read before page Y, forall X and Y where X < Y
+     */
+    public int[] getColumnsOrder() {
+        return columnsOrder;
+    }
+
+    private void init() {
+        int numberOfColumns = leafFrame.getNumberOfColumns();
+        offsetColumnIndexPairs = LongArrays.ensureCapacity(offsetColumnIndexPairs, numberOfColumns + 1, 0);
+        lengths = IntArrays.ensureCapacity(lengths, numberOfColumns, 0);
+        columnsOrder = IntArrays.ensureCapacity(columnsOrder, numberOfColumns + 1, 0);
+        nonEvictablePages.clear();
+        evictablePages.clear();
+        cloudOnlyPages.clear();
+        pageZeroId = leafFrame.getPageId();
+    }
+
+    private static int getOffsetFromPair(long pair) {
+        return (int) (pair >> 32);
+    }
+
+    private static int getColumnIndexFromPair(long pair) {
+        return (int) pair;
+    }
+
+    private void setCloudOnlyAndEvictablePages(int columnIndex, BitSet cloudOnlyColumns, BitSet evictableColumns,
+            int startPageId, int numberOfPages) {
+        if (evictableColumns == EMPTY && cloudOnlyColumns == EMPTY) {
+            return;
+        }
+
+        // Find pages that meant to be read from the cloud only or are evictable
+        boolean cloudOnly = cloudOnlyColumns.get(columnIndex);
+        boolean evictable = evictableColumns.get(columnIndex);
+        if (cloudOnly || evictable) {
+            for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+                if (cloudOnly) {
+                    cloudOnlyPages.set(j);
+                } else {
+                    evictablePages.set(j);
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        int numberOfPages = leafFrame.getMegaLeafNodeNumberOfPages();
+        builder.append("       ");
+        for (int i = 0; i < numberOfPages; i++) {
+            builder.append(String.format("%02d", i));
+            builder.append("  ");
+        }
+
+        builder.append('\n');
+        for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
+            builder.append(String.format("%03d", i));
+            builder.append(":");
+            int startPageId = getColumnStartPageIndex(i);
+            int columnPagesCount = getColumnNumberOfPages(i);
+            printColumnPages(builder, numberOfPages, startPageId, columnPagesCount);
+        }
+
+        builder.append("nonEvictablePages: ");
+        builder.append(nonEvictablePages);
+        builder.append('\n');
+        builder.append("evictablePages: ");
+        builder.append(evictablePages);
+        builder.append('\n');
+        builder.append("cloudOnlyPages: ");
+        builder.append(cloudOnlyPages);
+
+        return builder.toString();
+    }
+
+    private void printColumnPages(StringBuilder builder, int numberOfPages, int startPageId, int columnPagesCount) {
+        for (int i = 0; i < numberOfPages; i++) {
+            builder.append("   ");
+            if (i >= startPageId && i < startPageId + columnPagesCount) {
+                builder.append(1);
+            } else {
+                builder.append(0);
+            }
+        }
+        builder.append('\n');
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
new file mode 100644
index 0000000..af69fe5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import static org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.buffercache.page.ISweepLockInfo;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepLockInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+@NotThreadSafe
+public final class CloudColumnReadContext implements IColumnReadContext {
+    private final ColumnProjectorType operation;
+    private final IPhysicalDrive drive;
+    private final BitSet plan;
+    private final BitSet cloudOnlyColumns;
+    private final ColumnRanges columnRanges;
+    private final CloudMegaPageReadContext columnCtx;
+    private final List<ICachedPage> pinnedPages;
+    private final BitSet projectedColumns;
+
+    public CloudColumnReadContext(IColumnProjectionInfo projectionInfo, IPhysicalDrive drive, BitSet plan) {
+        this.operation = projectionInfo.getProjectorType();
+        this.drive = drive;
+        this.plan = plan;
+        columnRanges = new ColumnRanges(projectionInfo.getNumberOfPrimaryKeys());
+        cloudOnlyColumns = new BitSet();
+        columnCtx = new CloudMegaPageReadContext(operation, columnRanges, drive);
+        pinnedPages = new ArrayList<>();
+        projectedColumns = new BitSet();
+        if (operation == QUERY || operation == MODIFY) {
+            for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) {
+                int columnIndex = projectionInfo.getColumnIndex(i);
+                if (columnIndex >= 0) {
+                    projectedColumns.set(columnIndex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onPin(ICachedPage page) {
+        CloudCachedPage cloudPage = (CloudCachedPage) page;
+        ISweepLockInfo lockTest = cloudPage.beforeRead();
+        if (lockTest.isLocked()) {
+            ColumnSweepLockInfo lockedColumns = (ColumnSweepLockInfo) lockTest;
+            lockedColumns.getLockedColumns(cloudOnlyColumns);
+        }
+    }
+
+    @Override
+    public void onUnpin(ICachedPage page) {
+        CloudCachedPage cloudPage = (CloudCachedPage) page;
+        cloudPage.afterRead();
+    }
+
+    @Override
+    public boolean isNewPage() {
+        return false;
+    }
+
+    @Override
+    public boolean incrementStats() {
+        return true;
+    }
+
+    @Override
+    public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage) throws HyracksDataException {
+        // Page zero will be persisted (always) if free space permits
+        return readAndPersistPage(ioManager, fileHandle, header, cPage, drive.hasSpace());
+    }
+
+    @Override
+    public ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+            throws HyracksDataException {
+        // TODO do we support prefetching?
+        ICachedPage nextPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrame.getNextLeaf()), this);
+        release(bufferCache);
+        bufferCache.unpin(leafFrame.getPage(), this);
+        leafFrame.setPage(nextPage);
+        return nextPage;
+    }
+
+    @Override
+    public void prepareColumns(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+            throws HyracksDataException {
+        if (leafFrame.getTupleCount() == 0) {
+            return;
+        }
+
+        // TODO handle prefetch if supported
+
+        columnRanges.reset(leafFrame, projectedColumns, plan, cloudOnlyColumns);
+        int pageZeroId = leafFrame.getPageId();
+        int[] columnsOrders = columnRanges.getColumnsOrder();
+        int i = 0;
+        int columnIndex = columnsOrders[i];
+        while (columnIndex > -1) {
+            if (columnIndex < columnRanges.getNumberOfPrimaryKeys()) {
+                columnIndex = columnsOrders[++i];
+                continue;
+            }
+
+            int startPageId = columnRanges.getColumnStartPageIndex(columnIndex);
+            // Will increment the number pages if the next column's pages are contiguous to this column's pages
+            int numberOfPages = columnRanges.getColumnNumberOfPages(columnIndex);
+
+            // Advance to the next column to check if it has contiguous pages
+            columnIndex = columnsOrders[++i];
+            while (columnIndex > -1) {
+                // Get the next column's start page ID
+                int nextStartPageId = columnRanges.getColumnStartPageIndex(columnIndex);
+                if (nextStartPageId > startPageId + numberOfPages + 1) {
+                    // The next startPageId is not contiguous, stop.
+                    break;
+                }
+
+                // Last page of this column
+                int nextLastPage = nextStartPageId + columnRanges.getColumnNumberOfPages(columnIndex);
+                // The next column's pages are contiguous. Combine its ranges with the previous one.
+                numberOfPages = nextLastPage - startPageId;
+                // Advance to the next column
+                columnIndex = columnsOrders[++i];
+            }
+
+            columnCtx.prepare(numberOfPages);
+            pin(bufferCache, fileId, pageZeroId, startPageId, numberOfPages);
+        }
+    }
+
+    private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numOfRequestedPages)
+            throws HyracksDataException {
+        for (int i = start; i < start + numOfRequestedPages; i++) {
+            long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
+            pinnedPages.add(bufferCache.pin(dpid, columnCtx));
+        }
+    }
+
+    @Override
+    public void release(IBufferCache bufferCache) throws HyracksDataException {
+        // Release might differ in the future if prefetching is supported
+        close(bufferCache);
+    }
+
+    @Override
+    public void close(IBufferCache bufferCache) throws HyracksDataException {
+        release(pinnedPages, bufferCache, columnCtx);
+        columnCtx.close();
+    }
+
+    private static void release(List<ICachedPage> pinnedPages, IBufferCache bufferCache, IBufferCacheReadContext ctx)
+            throws HyracksDataException {
+        for (int i = 0; i < pinnedPages.size(); i++) {
+            bufferCache.unpin(pinnedPages.get(i), ctx);
+        }
+        pinnedPages.clear();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
new file mode 100644
index 0000000..0f0b0b9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
+import static org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
+import static org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@NotThreadSafe
+final class CloudMegaPageReadContext implements IBufferCacheReadContext {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ColumnProjectorType operation;
+    private final ColumnRanges columnRanges;
+    private final IPhysicalDrive drive;
+    private int numberOfContiguousPages;
+    private int pageCounter;
+    private InputStream gapStream;
+
+    CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges columnRanges, IPhysicalDrive drive) {
+        this.operation = operation;
+        this.columnRanges = columnRanges;
+        this.drive = drive;
+    }
+
+    public void prepare(int numberOfContiguousPages) throws HyracksDataException {
+        close();
+        this.numberOfContiguousPages = numberOfContiguousPages;
+        pageCounter = 0;
+    }
+
+    @Override
+    public void onPin(ICachedPage page) throws HyracksDataException {
+        CloudCachedPage cachedPage = (CloudCachedPage) page;
+        if (gapStream != null && cachedPage.skipCloudStream()) {
+            /*
+             * This page is requested but the buffer cache has a valid copy in memory. Also, the page itself was
+             * requested to be read from the cloud. Since this page is valid, no buffer cache read() will be performed.
+             * As the buffer cache read() is also responsible for persisting the bytes read from the cloud, we can end
+             * up writing the bytes of this page in the position of another page. Therefore, we should skip the bytes
+             * for this particular page to avoid placing the bytes of this page into another page's position.
+             */
+            try {
+                long remaining = cachedPage.getCompressedPageSize();
+                while (remaining > 0) {
+                    remaining -= gapStream.skip(remaining);
+                }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    @Override
+    public void onUnpin(ICachedPage page) {
+        // NoOp
+    }
+
+    @Override
+    public boolean isNewPage() {
+        return false;
+    }
+
+    @Override
+    public boolean incrementStats() {
+        return true;
+    }
+
+    @Override
+    public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage) throws HyracksDataException {
+        boolean empty = BufferCacheCloudReadContextUtil.isEmpty(header);
+        int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
+        boolean cloudOnly = columnRanges.isCloudOnly(pageId);
+        ByteBuffer buffer;
+        if (empty || cloudOnly || gapStream != null) {
+            boolean evictable = columnRanges.isEvictable(pageId);
+            /*
+             * Persist iff the following conditions are satisfied:
+             * - The page is empty
+             * - The page is not being evicted (cloudOnly)
+             * - The page is not planned for eviction (evictable)
+             * - The operation is not a merge operation (the component will be deleted anyway)
+             * - The disk has space
+             *
+             * Note: 'emtpy' can be false while 'cloudOnly is true'. We cannot read from disk as the page can be
+             * evicted at any moment. In other words, the sweeper told us that it is going to evict this page; hence
+             * 'cloudOnly' is true.
+             */
+            boolean persist = empty && !cloudOnly && !evictable && operation != MERGE && drive.hasSpace();
+            buffer = readFromStream(ioManager, fileHandle, header, cPage, persist);
+            buffer.position(RESERVED_HEADER_BYTES);
+        } else {
+            /*
+             * Here we can find a page that is planned for eviction, but it has not being evicted yet
+             * (i.e., empty = false). This could happen if the cursor is at a point the sweeper hasn't
+             * reached yet (i.e., cloudOnly = false).
+             */
+            buffer = DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
+        }
+
+        if (++pageCounter == numberOfContiguousPages) {
+            close();
+        }
+
+        return buffer;
+    }
+
+    void close() throws HyracksDataException {
+        if (gapStream != null) {
+            try {
+                gapStream.close();
+                gapStream = null;
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    private ByteBuffer readFromStream(IOManager ioManager, BufferedFileHandle fileHandle,
+            BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException {
+        InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
+        ByteBuffer buffer = header.getBuffer();
+        buffer.position(0);
+        try {
+            while (buffer.remaining() != 0) {
+                int length = stream.read(buffer.array(), buffer.position(), buffer.remaining());
+                if (length < 0) {
+                    throw new IllegalStateException("Stream should not be empty!");
+                }
+                buffer.position(buffer.position() + length);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+
+        buffer.flip();
+
+        if (persist) {
+            long offset = cPage.getCompressedPageOffset();
+            ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+            BufferCacheCloudReadContextUtil.persist(cloudIOManager, fileHandle.getFileHandle(), buffer, offset);
+        }
+
+        return buffer;
+    }
+
+    private InputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage)
+            throws HyracksDataException {
+        if (gapStream != null) {
+            return gapStream;
+        }
+
+        LOGGER.info("Cloud stream read for {} pages", numberOfContiguousPages - pageCounter);
+        int requiredNumOfPages = numberOfContiguousPages - pageCounter;
+        long offset = cPage.getCompressedPageOffset();
+        int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
+        long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
+
+        ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+        gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), offset, length);
+
+        return gapStream;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
new file mode 100644
index 0000000..dcde833
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.buffercache.context.DefaultCloudOnlyWriteContext;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
+import org.apache.hyracks.storage.common.buffercache.context.write.DefaultBufferCacheWriteContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
+public final class CloudColumnWriteContext implements IColumnWriteContext {
+    private static final int INITIAL_NUMBER_OF_COLUMNS = 32;
+    private final IPhysicalDrive drive;
+    private final ColumnSweepPlanner planner;
+    private final BitSet plan;
+    private final IntSet indexedColumns;
+    private int[] sizes;
+    private int numberOfColumns;
+    private IBufferCacheWriteContext currentContext;
+    /**
+     * writeAndSwap = true means the next call to write() will persist the page locally and swap to cloud-only
+     * writer (i.e., the following pages will be written in the cloud but not locally).
+     * <p>
+     * 'writeAndSwap' is set to 'true' iff the previous column's last page should be persisted AND it is
+     * 'overlapping' with this column's first page.
+     */
+    private boolean writeLocallyAndSwitchToCloudOnly;
+
+    public CloudColumnWriteContext(IPhysicalDrive drive, ColumnSweepPlanner planner, int numberOfColumns) {
+        this.drive = drive;
+        this.planner = planner;
+        this.plan = planner.getPlanCopy();
+        this.indexedColumns = planner.getIndexedColumnsCopy();
+        int initialLength =
+                planner.getNumberOfPrimaryKeys() == numberOfColumns ? INITIAL_NUMBER_OF_COLUMNS : numberOfColumns;
+        sizes = new int[initialLength];
+        // Number of columns is not known during the flush operation
+        this.numberOfColumns = 0;
+        currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+    }
+
+    @Override
+    public void startWritingColumn(int columnIndex, boolean overlapping) {
+        if (drive.hasSpace() || indexedColumns.contains(columnIndex)) {
+            // The current column should be persisted locally if free disk space permits
+            currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+        } else if (plan.get(columnIndex)) {
+            // This column was planned for eviction, do not persist.
+            if (overlapping && currentContext == DefaultBufferCacheWriteContext.INSTANCE) {
+                // The previous column's last page should be persisted AND it is overlapping with the current column's
+                // first page. Persist the first page locally and switch to cloud-only writer.
+                writeLocallyAndSwitchToCloudOnly = true;
+            } else {
+                // The previous column's last page is not overlapping. Switch to cloud-only writer (if not already)
+                currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+            }
+        } else {
+            // Local drive is pressured. Write to cloud only.
+            currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+        }
+    }
+
+    @Override
+    public void endWritingColumn(int columnIndex, int size) {
+        ensureCapacity(columnIndex);
+        sizes[columnIndex] = Math.max(sizes[columnIndex], size);
+    }
+
+    @Override
+    public void columnsPersisted() {
+        // Set the default writer context to persist pageZero and the interior nodes' pages locally
+        currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+        writeLocallyAndSwitchToCloudOnly = false;
+    }
+
+    @Override
+    public void close() {
+        // Report the sizes of the written columns
+        planner.adjustColumnSizes(sizes, numberOfColumns);
+    }
+
+    /*
+     * ************************************************************************************************
+     * WRITE methods
+     * ************************************************************************************************
+     */
+
+    @Override
+    public int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data)
+            throws HyracksDataException {
+        int writtenBytes = currentContext.write(ioManager, handle, offset, data);
+        switchIfNeeded();
+        return writtenBytes;
+    }
+
+    @Override
+    public long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data)
+            throws HyracksDataException {
+        long writtenBytes = currentContext.write(ioManager, handle, offset, data);
+        switchIfNeeded();
+        return writtenBytes;
+    }
+
+    /*
+     * ************************************************************************************************
+     * helper methods
+     * ************************************************************************************************
+     */
+
+    private void ensureCapacity(int columnIndex) {
+        int length = sizes.length;
+        if (columnIndex >= length) {
+            sizes = IntArrays.grow(sizes, columnIndex + 1);
+        }
+
+        numberOfColumns = Math.max(numberOfColumns, columnIndex + 1);
+    }
+
+    private void switchIfNeeded() {
+        if (writeLocallyAndSwitchToCloudOnly) {
+            // Switch to cloud-only writer
+            currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+            writeLocallyAndSwitchToCloudOnly = false;
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
new file mode 100644
index 0000000..ed83e84
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.cloud.buffercache.page.ISweepLockInfo;
+
+public final class ColumnSweepLockInfo implements ISweepLockInfo {
+    private final BitSet lockedColumns;
+
+    public ColumnSweepLockInfo() {
+        lockedColumns = new BitSet();
+    }
+
+    /**
+     * Reset the lock with plan's columns
+     *
+     * @param plan contains the columns to be locked
+     */
+    void reset(BitSet plan) {
+        lockedColumns.clear();
+        lockedColumns.or(plan);
+    }
+
+    /**
+     * Clear and set the locked columns in the provided {@link BitSet}
+     *
+     * @param lockedColumns used to get the locked columns
+     */
+    public void getLockedColumns(BitSet lockedColumns) {
+        lockedColumns.clear();
+        lockedColumns.or(this.lockedColumns);
+    }
+
+    @Override
+    public boolean isLocked() {
+        return true;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
new file mode 100644
index 0000000..9549f4f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongArrays;
+
+public final class ColumnSweepPlanner {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final double SIZE_WEIGHT = 0.3d;
+    private static final double LAST_ACCESS_WEIGHT = 1.0d - SIZE_WEIGHT;
+    private static final double INITIAL_PUNCHABLE_THRESHOLD = 0.7d;
+    private static final double PUNCHABLE_THRESHOLD_DECREMENT = 0.7d;
+    private static final int MAX_ITERATION_COUNT = 5;
+    private static final int REEVALUATE_PLAN_THRESHOLD = 50;
+
+    private final AtomicBoolean active;
+    private final int numberOfPrimaryKeys;
+    private final BitSet plan;
+    private final BitSet reevaluatedPlan;
+    private final IntSet indexedColumns;
+    private final ISweepClock clock;
+    private int numberOfColumns;
+    private long lastAccess;
+    private int maxSize;
+    private int[] sizes;
+    private long[] lastAccesses;
+
+    private double punchableThreshold;
+    private long lastSweepTs;
+    private int numberOfSweptColumns;
+    private int numberOfCloudRequests;
+
+    public ColumnSweepPlanner(int numberOfPrimaryKeys, ISweepClock clock) {
+        this.clock = clock;
+        active = new AtomicBoolean(false);
+        this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+        sizes = new int[0];
+        lastAccesses = new long[0];
+        indexedColumns = new IntOpenHashSet();
+        plan = new BitSet();
+        reevaluatedPlan = new BitSet();
+        punchableThreshold = INITIAL_PUNCHABLE_THRESHOLD;
+    }
+
+    public boolean isActive() {
+        return active.get();
+    }
+
+    public int getNumberOfPrimaryKeys() {
+        return numberOfPrimaryKeys;
+    }
+
+    public void onActivate(int numberOfColumns, List<ILSMDiskComponent> diskComponents,
+            IColumnTupleProjector sweepProjector, IBufferCache bufferCache) throws HyracksDataException {
+        resizeStatsArrays(numberOfColumns);
+        setInitialSizes(diskComponents, sweepProjector, bufferCache);
+        active.set(true);
+    }
+
+    public void setIndexedColumns(IColumnProjectionInfo projectionInfo) {
+        indexedColumns.clear();
+        for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) {
+            int columnIndex = projectionInfo.getColumnIndex(i);
+            indexedColumns.add(columnIndex);
+        }
+    }
+
+    public IntSet getIndexedColumnsCopy() {
+        return new IntOpenHashSet(indexedColumns);
+    }
+
+    public synchronized void access(IColumnProjectionInfo projectionInfo, boolean hasSpace) {
+        resetPlanIfNeeded(hasSpace);
+        long accessTime = clock.getCurrentTime();
+        lastAccess = accessTime;
+        int numberOfColumns = projectionInfo.getNumberOfProjectedColumns();
+        boolean requireCloudAccess = false;
+        for (int i = 0; i < numberOfColumns; i++) {
+            int columnIndex = projectionInfo.getColumnIndex(i);
+            if (columnIndex >= 0) {
+                // columnIndex can be -1 when accessing a non-existing column (i.e., not known by the schema)
+                lastAccesses[columnIndex] = accessTime;
+                requireCloudAccess |= numberOfSweptColumns > 0 && plan.get(columnIndex);
+            }
+        }
+
+        numberOfCloudRequests += requireCloudAccess ? 1 : 0;
+    }
+
+    public synchronized void adjustColumnSizes(int[] newSizes, int numberOfColumns) {
+        resizeStatsArrays(numberOfColumns);
+        for (int i = 0; i < numberOfColumns; i++) {
+            int newSize = newSizes[i];
+            sizes[i] = Math.max(sizes[i], newSize);
+            maxSize = Math.max(maxSize, newSize);
+        }
+    }
+
+    public synchronized boolean plan() {
+        plan.clear();
+        int numberOfEvictableColumns = 0;
+        int iter = 0;
+        // Calculate weights: Ensure the plan contains new columns that never been swept
+        while (iter < MAX_ITERATION_COUNT && numberOfEvictableColumns < numberOfColumns) {
+            if (numberOfEvictableColumns > 0) {
+                // Do not reiterate if we found columns to evict
+                break;
+            }
+
+            // Find evictable columns
+            numberOfEvictableColumns += findEvictableColumns(plan);
+
+            // The next iteration/plan will be more aggressive
+            punchableThreshold *= PUNCHABLE_THRESHOLD_DECREMENT;
+            iter++;
+        }
+        // Register the plan time
+        lastSweepTs = clock.getCurrentTime();
+        // Add the number of evictable columns
+        numberOfSweptColumns += numberOfEvictableColumns;
+        if (numberOfEvictableColumns > 0) {
+            LOGGER.info("Planning to evict {} columns. The evictable columns are {}", numberOfEvictableColumns, plan);
+            return true;
+        }
+
+        LOGGER.info("Couldn't find columns to evict after {} iteration", iter);
+        return false;
+    }
+
+    public synchronized BitSet getPlanCopy() {
+        return (BitSet) plan.clone();
+    }
+
+    private double getWeight(int i, IntSet indexedColumns, int numberOfPrimaryKeys) {
+        if (i < numberOfPrimaryKeys || indexedColumns.contains(i)) {
+            return -1.0;
+        }
+
+        double sizeWeight = sizes[i] / (double) maxSize * SIZE_WEIGHT;
+        double lasAccessWeight = (lastAccess - lastAccesses[i]) / (double) lastAccess * LAST_ACCESS_WEIGHT;
+
+        return sizeWeight + lasAccessWeight;
+    }
+
+    private void resizeStatsArrays(int numberOfColumns) {
+        sizes = IntArrays.ensureCapacity(sizes, numberOfColumns);
+        lastAccesses = LongArrays.ensureCapacity(lastAccesses, numberOfColumns);
+        this.numberOfColumns = numberOfColumns - numberOfPrimaryKeys;
+    }
+
+    private void setInitialSizes(List<ILSMDiskComponent> diskComponents, IColumnTupleProjector sweepProjector,
+            IBufferCache bufferCache) throws HyracksDataException {
+        // This runs when activating an index (no need to synchronize on the opTracker)
+        if (diskComponents.isEmpty()) {
+            return;
+        }
+
+        IColumnProjectionInfo columnProjectionInfo =
+                ColumnSweeperUtil.createColumnProjectionInfo(diskComponents, sweepProjector);
+        ILSMDiskComponent latestComponent = diskComponents.get(0);
+        ILSMDiskComponent oldestComponent = diskComponents.get(diskComponents.size() - 1);
+        ColumnBTreeReadLeafFrame leafFrame = ColumnSweeperUtil.createLeafFrame(columnProjectionInfo, latestComponent);
+        ColumnRanges ranges = new ColumnRanges(columnProjectionInfo.getNumberOfPrimaryKeys());
+        // Get the column sizes from the freshest component, which has the columns of the most recent schema
+        setColumnSizes(latestComponent, leafFrame, ranges, bufferCache);
+        if (isMergedComponent(oldestComponent.getId())) {
+            // Get the column sizes from the oldest merged component, which probably has the largest columns
+            setColumnSizes(oldestComponent, leafFrame, ranges, bufferCache);
+        }
+    }
+
+    private void setColumnSizes(ILSMDiskComponent diskComponent, ColumnBTreeReadLeafFrame leafFrame,
+            ColumnRanges ranges, IBufferCache bufferCache) throws HyracksDataException {
+        ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+        long dpid = BufferedFileHandle.getDiskPageId(columnBTree.getFileId(), columnBTree.getBulkloadLeafStart());
+        ICachedPage page = bufferCache.pin(dpid);
+        try {
+            leafFrame.setPage(page);
+            ranges.reset(leafFrame);
+            for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
+                sizes[i] = Math.max(sizes[i], ranges.getColumnLength(i));
+                maxSize = Math.max(maxSize, sizes[i]);
+            }
+        } finally {
+            bufferCache.unpin(page);
+        }
+    }
+
+    private int findEvictableColumns(BitSet plan) {
+        int numberOfEvictableColumns = 0;
+        for (int i = 0; i < sizes.length; i++) {
+            if (!plan.get(i) && getWeight(i, indexedColumns, numberOfPrimaryKeys) >= punchableThreshold) {
+                // Column reached a punchable threshold; include it in the eviction plan.
+                plan.set(i);
+                numberOfEvictableColumns++;
+            }
+        }
+
+        return numberOfEvictableColumns;
+    }
+
+    private void resetPlanIfNeeded(boolean hasSpace) {
+        if (!hasSpace || numberOfCloudRequests < REEVALUATE_PLAN_THRESHOLD) {
+            return;
+        }
+
+        numberOfCloudRequests = 0;
+        reevaluatedPlan.clear();
+        int numberOfEvictableColumns = findEvictableColumns(reevaluatedPlan);
+        for (int i = 0; i < numberOfEvictableColumns; i++) {
+            int columnIndex = reevaluatedPlan.nextSetBit(i);
+            if (!plan.get(columnIndex)) {
+                // the plan contains a stale column. Invalidate!
+                plan.clear();
+                plan.or(reevaluatedPlan);
+                LOGGER.info("Re-planning to evict {} columns. The newly evictable columns are {}",
+                        numberOfEvictableColumns, plan);
+                break;
+            }
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
new file mode 100644
index 0000000..4932bf0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState.READABLE_UNWRITABLE;
+import static org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.sweeper.SweepContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.CriticalPath;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class ColumnSweeper {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ColumnSweepLockInfo lockedColumns;
+    private final ColumnRanges ranges;
+    private final List<ILSMDiskComponent> sweepableComponents;
+
+    public ColumnSweeper(int numberOfPrimaryKeys) {
+        lockedColumns = new ColumnSweepLockInfo();
+        ranges = new ColumnRanges(numberOfPrimaryKeys);
+        sweepableComponents = new ArrayList<>();
+    }
+
+    public long sweep(BitSet plan, SweepContext context, IColumnTupleProjector sweepProjector)
+            throws HyracksDataException {
+        IndexUnit indexUnit = context.getIndexUnit();
+        LSMColumnBTree lsmColumnBTree = (LSMColumnBTree) indexUnit.getIndex();
+        IColumnProjectionInfo projectionInfo = captureSweepableComponents(lsmColumnBTree, sweepProjector);
+        if (projectionInfo == null) {
+            // no sweepable components
+            return 0L;
+        }
+
+        LOGGER.info("Sweeping {}", lsmColumnBTree);
+        ILSMDiskComponent latestComponent = sweepableComponents.get(0);
+        ColumnBTreeReadLeafFrame leafFrame = ColumnSweeperUtil.createLeafFrame(projectionInfo, latestComponent);
+        IBufferCacheReadContext bcOpCtx = SweepBufferCacheReadContext.INSTANCE;
+        lockedColumns.reset(plan);
+        long freedSpace = 0L;
+        for (int i = 0; i < sweepableComponents.size(); i++) {
+            if (context.stopSweeping()) {
+                // Exit as the index is being dropped
+                return 0L;
+            }
+
+            boolean failed = false;
+            // Components are entered one at a time to allow components to be merged and deactivated
+            ILSMDiskComponent diskComponent = enterAndGetComponent(i, lsmColumnBTree);
+            // If diskComponent is null, that means it is not a viable candidate anymore
+            if (diskComponent != null) {
+                try {
+                    freedSpace += sweepDiskComponent(leafFrame, diskComponent, plan, context, bcOpCtx);
+                } catch (Throwable e) {
+                    failed = true;
+                    throw e;
+                } finally {
+                    exitComponent(diskComponent, lsmColumnBTree, failed);
+                }
+            }
+        }
+
+        LOGGER.info("Swept {} components and freed {} from disk", sweepableComponents.size(),
+                StorageUtil.toHumanReadableSize(freedSpace));
+        return freedSpace;
+    }
+
+    @CriticalPath
+    private IColumnProjectionInfo captureSweepableComponents(LSMColumnBTree lsmColumnBTree,
+            IColumnTupleProjector sweepProjector) throws HyracksDataException {
+        ILSMOperationTracker opTracker = lsmColumnBTree.getOperationTracker();
+        sweepableComponents.clear();
+        synchronized (opTracker) {
+            List<ILSMDiskComponent> diskComponents = lsmColumnBTree.getDiskComponents();
+            for (int i = 0; i < diskComponents.size(); i++) {
+                ILSMDiskComponent diskComponent = diskComponents.get(i);
+                /*
+                 * Get components that are only in READABLE_UNWRITABLE state. Components that are currently being
+                 * merged should not be swept as they will be deleted anyway.
+                 * Also, only sweep merged components as flushed components are relatively smaller and should be
+                 * merged eventually. So, it is preferable to read everything locally when merging flushed components.
+                 * TODO should we sweep flushed components?
+                 */
+                if (isMergedComponent(diskComponent.getId()) && diskComponent.getState() == READABLE_UNWRITABLE
+                        && diskComponent.getComponentSize() > 0) {
+                    // The component is a good candidate to be swept
+                    sweepableComponents.add(diskComponent);
+                }
+            }
+
+            if (sweepableComponents.isEmpty()) {
+                // No sweepable components
+                return null;
+            }
+
+            return ColumnSweeperUtil.createColumnProjectionInfo(sweepableComponents, sweepProjector);
+        }
+    }
+
+    private ILSMDiskComponent enterAndGetComponent(int index, LSMColumnBTree lsmColumnBTree)
+            throws HyracksDataException {
+        ILSMDiskComponent diskComponent = sweepableComponents.get(index);
+        synchronized (lsmColumnBTree.getOperationTracker()) {
+            // Make sure the component is still in READABLE_UNWRITABLE state
+            if (diskComponent.getState() == READABLE_UNWRITABLE
+                    && diskComponent.threadEnter(LSMOperationType.DISK_COMPONENT_SCAN, false)) {
+                return diskComponent;
+            }
+        }
+
+        // the component is not a viable candidate anymore
+        return null;
+    }
+
+    private void exitComponent(ILSMDiskComponent diskComponent, LSMColumnBTree lsmColumnBTree, boolean failed)
+            throws HyracksDataException {
+        synchronized (lsmColumnBTree.getOperationTracker()) {
+            diskComponent.threadExit(LSMOperationType.DISK_COMPONENT_SCAN, failed, false);
+        }
+    }
+
+    private long sweepDiskComponent(ColumnBTreeReadLeafFrame leafFrame, ILSMDiskComponent diskComponent, BitSet plan,
+            SweepContext context, IBufferCacheReadContext bcOpCtx) throws HyracksDataException {
+        long dpid = getFirstPageId(diskComponent);
+        int fileId = BufferedFileHandle.getFileId(dpid);
+        int nextPageId = BufferedFileHandle.getPageId(dpid);
+        int freedSpace = 0;
+        context.open(fileId);
+        try {
+            while (nextPageId >= 0) {
+                if (context.stopSweeping()) {
+                    // Exit as the index is being dropped
+                    return 0L;
+                }
+                CloudCachedPage page0 = context.pin(BufferedFileHandle.getDiskPageId(fileId, nextPageId), bcOpCtx);
+                boolean columnsLocked = false;
+                try {
+                    leafFrame.setPage(page0);
+                    nextPageId = leafFrame.getNextLeaf();
+                    columnsLocked = page0.trySweepLock(lockedColumns);
+                    if (columnsLocked) {
+                        leafFrame.setPage(page0);
+                        ranges.reset(leafFrame, plan);
+                        freedSpace += punchHoles(context, leafFrame);
+                    }
+                } finally {
+                    if (columnsLocked) {
+                        page0.sweepUnlock();
+                    }
+                    context.unpin(page0, bcOpCtx);
+                }
+            }
+        } finally {
+            context.close();
+        }
+
+        return freedSpace;
+    }
+
+    private long getFirstPageId(ILSMDiskComponent diskComponent) {
+        ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+        int fileId = columnBTree.getFileId();
+        int firstPage = columnBTree.getBulkloadLeafStart();
+        return BufferedFileHandle.getDiskPageId(fileId, firstPage);
+    }
+
+    private int punchHoles(SweepContext context, ColumnBTreeReadLeafFrame leafFrame) throws HyracksDataException {
+        int freedSpace = 0;
+        int numberOfPages = leafFrame.getMegaLeafNodeNumberOfPages();
+        int pageZeroId = leafFrame.getPageId();
+
+        // Start from 1 as we do not evict pageZero
+        BitSet nonEvictablePages = ranges.getNonEvictablePages();
+        int start = nonEvictablePages.nextClearBit(1);
+        while (start < numberOfPages) {
+            int end = nonEvictablePages.nextSetBit(start);
+            int numberOfEvictablePages = end - start;
+            freedSpace += context.punchHole(pageZeroId + start, numberOfEvictablePages);
+
+            start = nonEvictablePages.nextClearBit(end);
+        }
+
+        return freedSpace;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
new file mode 100644
index 0000000..e51f1d7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeLeafFrameFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+
+public class ColumnSweeperUtil {
+    private ColumnSweeperUtil() {
+    }
+
+    public static final BitSet EMPTY = new BitSet();
+
+    static IColumnProjectionInfo createColumnProjectionInfo(List<ILSMDiskComponent> diskComponents,
+            IColumnTupleProjector projector) throws HyracksDataException {
+        // Get the newest disk component, which has the newest column metadata
+        ILSMDiskComponent newestComponent = diskComponents.get(0);
+        IValueReference columnMetadata = ColumnUtil.getColumnMetadataCopy(newestComponent.getMetadata());
+
+        return projector.createProjectionInfo(columnMetadata);
+    }
+
+    static ColumnBTreeReadLeafFrame createLeafFrame(IColumnProjectionInfo projectionInfo,
+            ILSMDiskComponent diskComponent) {
+        ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+        ColumnBTreeLeafFrameFactory leafFrameFactory = (ColumnBTreeLeafFrameFactory) columnBTree.getLeafFrameFactory();
+        return leafFrameFactory.createReadFrame(projectionInfo);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
new file mode 100644
index 0000000..f36a51d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+@FunctionalInterface
+public interface ISweepClock {
+    long getCurrentTime();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
new file mode 100644
index 0000000..7b51d55
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+final class SweepBufferCacheReadContext implements IBufferCacheReadContext {
+    static final IBufferCacheReadContext INSTANCE = new SweepBufferCacheReadContext();
+
+    private SweepBufferCacheReadContext() {
+    }
+
+    @Override
+    public void onPin(ICachedPage page) {
+        // NoOp
+    }
+
+    @Override
+    public void onUnpin(ICachedPage page) {
+        // NoOp
+    }
+
+    @Override
+    public boolean isNewPage() {
+        return false;
+    }
+
+    @Override
+    public boolean incrementStats() {
+        // Do not increment the stats for the sweeper
+        return false;
+    }
+
+    @Override
+    public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage) throws HyracksDataException {
+        // Will not persist as the disk is pressured
+        return readAndPersistPage(ioManager, fileHandle, header, cPage, false);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
new file mode 100644
index 0000000..6bace98
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+import static org.apache.hyracks.util.StorageUtil.getIntSizeInBytes;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Random;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.dummy.DummyColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.dummy.DummySweepClock;
+import org.apache.hyracks.util.StorageUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
+public class ColumnSweepPlannerTest {
+    private static final int MAX_MEGA_LEAF_NODE_SIZE = getIntSizeInBytes(10, StorageUtil.StorageUnit.MEGABYTE);
+    private static final Random RANDOM = new Random(0);
+    private final DummySweepClock clock = new DummySweepClock();
+
+    @Test
+    public void test10Columns() {
+        int numberOfPrimaryKeys = 1;
+        int numberOfColumns = numberOfPrimaryKeys + 10;
+        int[] columnSizes = createNormalColumnSizes(numberOfPrimaryKeys, numberOfColumns);
+        ColumnSweepPlanner planner = new ColumnSweepPlanner(numberOfPrimaryKeys, clock);
+        IntList projectedColumns = new IntArrayList();
+        DummyColumnProjectionInfo info = new DummyColumnProjectionInfo(numberOfPrimaryKeys, QUERY, projectedColumns);
+
+        // Adjust sizes
+        planner.adjustColumnSizes(columnSizes, numberOfColumns);
+
+        // Project 3 columns
+        projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, projectedColumns);
+        // access the projected columns (max 10 times)
+        access(planner, info, true, 10);
+
+        // Advance clock
+        clock.advance(10);
+
+        // Plan for eviction
+        BitSet keptColumns = new BitSet();
+        planner.plan();
+        computeKeptColumns(planner.getPlanCopy(), keptColumns, numberOfColumns);
+
+        // Project another 3 columns
+        projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, projectedColumns);
+        // access the projected columns
+        access(planner, info, true, 100);
+
+        // At this point, the plan should change
+        BitSet newKeptColumns = new BitSet();
+        computeKeptColumns(planner.getPlanCopy(), newKeptColumns, numberOfColumns);
+
+        Assert.assertNotEquals(keptColumns, newKeptColumns);
+    }
+
+    private void computeKeptColumns(BitSet plan, BitSet keptColumns, int numberOfColumns) {
+        keptColumns.clear();
+        for (int i = 0; i < numberOfColumns; i++) {
+            if (!plan.get(i)) {
+                keptColumns.set(i);
+            }
+        }
+
+        System.out.println("Kept columns: " + keptColumns);
+    }
+
+    private void access(ColumnSweepPlanner planner, DummyColumnProjectionInfo info, boolean hasSpace, int bound) {
+        int numberOfAccesses = RANDOM.nextInt(1, bound);
+        for (int i = 0; i < numberOfAccesses; i++) {
+            planner.access(info, hasSpace);
+            clock.advance(1);
+        }
+
+        System.out.println("Accessed: " + info + " " + numberOfAccesses + " time");
+    }
+
+    private void projectedColumns(int numberPrimaryKeys, int numberOfColumns, int numberOfProjectedColumns,
+            IntList projectedColumns) {
+        IntSet alreadyProjectedColumns = new IntOpenHashSet();
+        projectedColumns.clear();
+        for (int i = 0; i < numberOfProjectedColumns; i++) {
+            int columnIndex = RANDOM.nextInt(numberPrimaryKeys, numberOfColumns);
+            while (alreadyProjectedColumns.contains(columnIndex)) {
+                columnIndex = RANDOM.nextInt(numberPrimaryKeys, numberOfColumns);
+            }
+            projectedColumns.add(columnIndex);
+            alreadyProjectedColumns.add(columnIndex);
+        }
+    }
+
+    private int[] createNormalColumnSizes(int numberOfPrimaryKeys, int numberOfColumns) {
+        int[] columnSizes = new int[numberOfColumns];
+        double[] normalDistribution = new double[numberOfColumns];
+        double sum = 0.0d;
+        for (int i = 0; i < numberOfColumns; i++) {
+            double value = Math.abs(RANDOM.nextGaussian());
+            normalDistribution[i] = value;
+            sum += value;
+        }
+
+        for (int i = numberOfPrimaryKeys; i < numberOfColumns; i++) {
+            int size = (int) Math.round((normalDistribution[i] / sum) * MAX_MEGA_LEAF_NODE_SIZE);
+            columnSizes[i] = size;
+        }
+
+        System.out.println("Column sizes:");
+        for (int i = 0; i < numberOfColumns; i++) {
+            System.out.println(i + ": " + StorageUtil.toHumanReadableSize(columnSizes[i]));
+        }
+        System.out.println("TotalSize:" + StorageUtil.toHumanReadableSize(Arrays.stream(columnSizes).sum()));
+        return columnSizes;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
new file mode 100644
index 0000000..3b9d6ba
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dummy;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+import it.unimi.dsi.fastutil.ints.IntList;
+
+public class DummyColumnProjectionInfo implements IColumnProjectionInfo {
+    private final int numberOfPrimaryKeys;
+    private final ColumnProjectorType projectorType;
+    private final IntList projectedColumns;
+
+    public DummyColumnProjectionInfo(int numberOfPrimaryKeys, ColumnProjectorType projectorType,
+            IntList projectedColumns) {
+        this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+        this.projectorType = projectorType;
+        this.projectedColumns = projectedColumns;
+    }
+
+    @Override
+    public int getColumnIndex(int ordinal) {
+        return projectedColumns.getInt(ordinal);
+    }
+
+    @Override
+    public int getNumberOfProjectedColumns() {
+        return projectedColumns.size();
+    }
+
+    @Override
+    public int getNumberOfPrimaryKeys() {
+        return numberOfPrimaryKeys;
+    }
+
+    @Override
+    public int getFilteredColumnIndex(int ordinal) {
+        return -1;
+    }
+
+    @Override
+    public int getNumberOfFilteredColumns() {
+        return 0;
+    }
+
+    @Override
+    public ColumnProjectorType getProjectorType() {
+        return projectorType;
+    }
+
+    @Override
+    public String toString() {
+        return projectedColumns.toString();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
new file mode 100644
index 0000000..6f209ec
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dummy;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ISweepClock;
+
+public class DummySweepClock implements ISweepClock {
+    long timestamp;
+
+    @Override
+    public long getCurrentTime() {
+        return timestamp;
+    }
+
+    public void advance(long delta) {
+        timestamp += delta;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
index 1779527..98d2d73 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
@@ -55,15 +55,17 @@
     }
 
     public static void persist(ILSMComponentId id, IComponentMetadata metadata) throws HyracksDataException {
-        LSMComponentId componentId = (LSMComponentId) id;
-        metadata.put(COMPONENT_ID_MIN_KEY, LongPointable.FACTORY.createPointable(componentId.getMinId()));
-        metadata.put(COMPONENT_ID_MAX_KEY, LongPointable.FACTORY.createPointable(componentId.getMaxId()));
+        metadata.put(COMPONENT_ID_MIN_KEY, LongPointable.FACTORY.createPointable(id.getMinId()));
+        metadata.put(COMPONENT_ID_MAX_KEY, LongPointable.FACTORY.createPointable(id.getMaxId()));
     }
 
     public static ILSMComponentId union(ILSMComponentId id1, ILSMComponentId id2) {
-        long minId = Long.min(((LSMComponentId) id1).getMinId(), ((LSMComponentId) id2).getMinId());
-        long maxId = Long.max(((LSMComponentId) id1).getMaxId(), ((LSMComponentId) id2).getMaxId());
+        long minId = Long.min(id1.getMinId(), id2.getMinId());
+        long maxId = Long.max(id1.getMaxId(), id2.getMaxId());
         return new LSMComponentId(minId, maxId);
     }
 
+    public static boolean isMergedComponent(ILSMComponentId id) {
+        return id.getMinId() < id.getMaxId();
+    }
 }