[ASTERIXDB-3601][STO] Supporting multi-page zeroes

- user model changes: no
- storage format changes: yes
- interface changes: yes

Details:
This patch introduces writers which can span upto
multiple pageZeroes, which can theoretically help
to achieve a very large number of columns.

Ext-ref: MB-66306
Change-Id: I0db8588d9a05331a6542b9bc1f3ca1c342ca70a7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19987
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ritik Raj <ritik.raj@couchbase.com>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
new file mode 100644
index 0000000..bb02166
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.in;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.ColumnMultiPageZeroBufferProvider;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
+public final class MultiPageZeroByteBuffersReader {
+    private static final ByteBuffer EMPTY;
+    private ColumnMultiPageZeroBufferProvider bufferProvider;
+    private final Int2IntMap segmentDir; // should I just create a buffer[numberOfSegments] instead?
+    private int maxBuffersSize;
+
+    static {
+        EMPTY = ByteBuffer.allocate(0);
+        EMPTY.limit(0);
+    }
+
+    private final List<ByteBuffer> buffers;
+
+    public MultiPageZeroByteBuffersReader() {
+        this.buffers = new ArrayList<>();
+        segmentDir = new Int2IntOpenHashMap();
+        segmentDir.defaultReturnValue(-1);
+    }
+
+    public void reset(IColumnBufferProvider bufferProvider) throws HyracksDataException {
+        ColumnMultiPageZeroBufferProvider pageZeroBufferProvider = (ColumnMultiPageZeroBufferProvider) bufferProvider;
+        reset();
+        this.bufferProvider = pageZeroBufferProvider;
+        maxBuffersSize = pageZeroBufferProvider.getNumberOfRemainingPages();
+        pageZeroBufferProvider.readAll(buffers, segmentDir);
+    }
+
+    public void read(int segmentIndex, IPointable pointable, int position, int length)
+            throws EOFException, HyracksDataException {
+        if (segmentIndex < 0 || segmentIndex >= maxBuffersSize) {
+            throw new IndexOutOfBoundsException("Buffer index out of bounds: " + segmentIndex);
+        }
+
+        int bufferIndex = segmentDir.get(segmentIndex);
+        if (bufferIndex == -1) {
+            //Fill up the buffer
+            // this page was not pinned, because of the DefaultReadContext, as the pages were expected to be in disk.
+            // so read the required segment, and fill the buffer.
+            ByteBuffer buffer = bufferProvider.read(segmentIndex);
+            segmentDir.put(segmentIndex, buffers.size());
+            bufferIndex = buffers.size();
+            buffers.add(buffer);
+        }
+        ByteBuffer buffer = buffers.get(bufferIndex);
+        pointable.set(buffer.array(), position, length);
+    }
+
+    public void readOffset(long[] offsetColumnIndexPairs, int maxColumnsInZerothSegment, int numberOfColumnsInAPage) {
+        int numberOfColumns = offsetColumnIndexPairs.length - 1;
+        for (Int2IntMap.Entry pair : segmentDir.int2IntEntrySet()) {
+            int segmentIndex = pair.getIntKey();
+            int bufferIndex = pair.getIntValue();
+            ByteBuffer buffer = buffers.get(bufferIndex);
+            int columnIndex = maxColumnsInZerothSegment + segmentIndex * numberOfColumnsInAPage;
+            int segmentOffset = 0;
+            for (int j = 0; j < numberOfColumnsInAPage; j++) {
+                int columnOffset = buffer.getInt(segmentOffset);
+                offsetColumnIndexPairs[columnIndex] = IntPairUtil.of(columnOffset, columnIndex);
+                segmentOffset += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+                columnIndex++;
+                if (columnIndex == numberOfColumns) {
+                    break; // No need to read more columns from this buffer.
+                }
+            }
+        }
+    }
+
+    public void readSparseOffset(long[] offsetColumnIndexPairs, int numberOfPageSegments, int numberOfColumnsInAPage,
+            int numberOfColumnsInLastSegment) {
+        for (Int2IntMap.Entry pair : segmentDir.int2IntEntrySet()) {
+            int segmentIndex = pair.getIntKey();
+            int bufferIndex = pair.getIntValue();
+            ByteBuffer buffer = buffers.get(bufferIndex);
+            int segmentOffset = 0;
+            int numberOfColumnsInSegment =
+                    segmentIndex == numberOfPageSegments - 2 ? numberOfColumnsInLastSegment : numberOfColumnsInAPage;
+            for (int j = 0; j < numberOfColumnsInSegment; j++) {
+                int columnIndex = buffer.getInt(segmentOffset);
+                int columnOffset = buffer.getInt(segmentOffset + Integer.BYTES);
+                offsetColumnIndexPairs[columnIndex] = IntPairUtil.of(columnOffset, columnIndex);
+                segmentOffset += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+            }
+        }
+    }
+
+    public void readAllColumns(BitSet presentColumns, int numberOfPageSegments, int numberOfColumnsInAPage,
+            int numberOfColumnsInLastSegment) {
+        final int stride = SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        final int lastSegmentIndex = numberOfPageSegments - 2;
+
+        for (Int2IntMap.Entry entry : segmentDir.int2IntEntrySet()) {
+            final int segmentIndex = entry.getIntKey();
+            final int bufferIndex = entry.getIntValue();
+            final ByteBuffer buffer = buffers.get(bufferIndex);
+
+            final int columnsInSegment =
+                    (segmentIndex == lastSegmentIndex) ? numberOfColumnsInLastSegment : numberOfColumnsInAPage;
+
+            int offset = 0;
+            int limit = columnsInSegment * stride;
+
+            while (offset < limit) {
+                presentColumns.set(buffer.getInt(offset));
+                offset += stride;
+            }
+        }
+    }
+
+    public int findColumnIndexInSegment(int segmentIndex, int columnIndex, int numberOfColumnsInSegment)
+            throws HyracksDataException {
+        if (segmentIndex < 0 || segmentIndex >= maxBuffersSize) {
+            throw new IndexOutOfBoundsException("Buffer index out of bounds: " + segmentIndex);
+        }
+        int bufferIndex = segmentDir.get(segmentIndex);
+        if (bufferIndex == -1) {
+            //Fill up the buffer
+            // this page was not pinned, because of the DefaultReadContext, as the pages were expected to be in disk.
+            // so read the required segment, and fill the buffer.
+            ByteBuffer buffer = bufferProvider.read(segmentIndex);
+            segmentDir.put(segmentIndex, buffers.size());
+            bufferIndex = buffers.size();
+            buffers.add(buffer);
+        }
+        ByteBuffer buffer = buffers.get(bufferIndex);
+        int start = 0;
+        int end = numberOfColumnsInSegment - 1;
+        while (start <= end) {
+            int mid = start + (end - start) / 2;
+            int midColumnIndex = buffer.getInt(mid * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE);
+            if (midColumnIndex == columnIndex) {
+                return mid; // found the column index
+            } else if (midColumnIndex < columnIndex) {
+                start = mid + 1;
+            } else {
+                end = mid - 1;
+            }
+        }
+
+        return -1;
+    }
+
+    public void reset() {
+        buffers.clear();
+        maxBuffersSize = 0;
+        segmentDir.clear();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
index 4b7c835..0c311f1 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
@@ -81,6 +81,34 @@
         }
     }
 
+    public void writeInSegment(int bufferIndex, int off, int val) throws IOException {
+        if (bufferIndex >= buffers.size()) {
+            int requiredBuffers = bufferIndex - buffers.size() + 1;
+            allocateBuffers(requiredBuffers);
+        }
+        ByteBuffer buffer = buffers.get(bufferIndex);
+        buffer.putInt(off, val);
+    }
+
+    public void writeInSegment(int bufferIndex, int off, int val1, int val2) throws IOException {
+        if (bufferIndex >= buffers.size()) {
+            int requiredBuffers = bufferIndex - buffers.size() + 1;
+            allocateBuffers(requiredBuffers);
+        }
+        ByteBuffer buffer = buffers.get(bufferIndex);
+        buffer.putInt(off, val1);
+        buffer.putInt(off + Integer.BYTES, val2);
+    }
+
+    public void writeInSegment(int bufferIndex, int off, long val) throws IOException {
+        if (bufferIndex >= buffers.size()) {
+            int requiredBuffers = bufferIndex - buffers.size() + 1;
+            allocateBuffers(requiredBuffers);
+        }
+        ByteBuffer buffer = buffers.get(bufferIndex);
+        buffer.putLong(off, val);
+    }
+
     @Override
     public void reserveByte(IReservedPointer pointer) throws IOException {
         ensureCapacity(Byte.BYTES);
@@ -161,4 +189,10 @@
         allocatedBytes += size;
         return size;
     }
+
+    protected void allocateBuffers(int count) throws HyracksDataException {
+        for (int i = 0; i < count; i++) {
+            allocateBuffer();
+        }
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentPageZeroBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentPageZeroBufferBytesOutputStream.java
new file mode 100644
index 0000000..c156e49
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentPageZeroBufferBytesOutputStream.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+public final class MultiPersistentPageZeroBufferBytesOutputStream extends AbstractMultiBufferBytesOutputStream {
+    public MultiPersistentPageZeroBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        super(multiPageOpRef);
+    }
+
+    @Override
+    protected ByteBuffer confiscateNewBuffer() throws HyracksDataException {
+        return multiPageOpRef.getValue().confiscatePageZeroPersistent();
+    }
+
+    public void reset(int requiredPageSegments) throws HyracksDataException {
+        preReset();
+        allocateBuffers(requiredPageSegments); // these many buffers are required for page zero segments
+    }
+
+    @Override
+    protected void preReset() {
+        if (allocatedBytes > 0) {
+            //This should not be the case, with the pageZero segments.
+            //As the stream should be finished after the flush.
+            throw new IllegalStateException("PageZero segments should already be finished after flush");
+        }
+    }
+
+    @Override
+    public void writeTo(OutputStream outputStream) {
+        throw new IllegalAccessError("Persistent stream cannot be written to other stream");
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java
index e14fce3..218b2ba 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java
@@ -139,16 +139,15 @@
     }
 
     public static void setFilterValues(List<IColumnRangeFilterValueAccessor> filterValueAccessors,
-            ColumnBTreeReadLeafFrame frame) {
+            ColumnBTreeReadLeafFrame frame) throws HyracksDataException {
         IColumnPageZeroReader pageZeroReader = frame.getColumnPageZeroReader();
         for (int i = 0; i < filterValueAccessors.size(); i++) {
             ColumnRangeFilterValueAccessor accessor = (ColumnRangeFilterValueAccessor) filterValueAccessors.get(i);
             int columnIndex = accessor.getColumnIndex();
             long normalizedValue;
             if (pageZeroReader.isValidColumn(columnIndex)) {
-                int filterOffset = pageZeroReader.getColumnFilterOffset(columnIndex);
-                normalizedValue = accessor.isMin() ? pageZeroReader.getLong(filterOffset)
-                        : pageZeroReader.getLong(filterOffset + Long.BYTES);
+                normalizedValue = accessor.isMin() ? pageZeroReader.getColumnFilterMin(columnIndex)
+                        : pageZeroReader.getColumnFilterMax(columnIndex);
             } else {
                 // Column is missing
                 normalizedValue = accessor.isMin() ? Long.MAX_VALUE : Long.MIN_VALUE;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index 53a6375..46dae1b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.column.operation.lsm.flush;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.BitSet;
 
 import org.apache.asterix.column.values.IColumnValuesWriter;
@@ -29,6 +28,8 @@
 import org.apache.asterix.column.zero.PageZeroWriterFlavorSelector;
 import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
 import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.SparseColumnMultiPageZeroWriter;
 import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
 import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
 import org.apache.commons.lang3.mutable.Mutable;
@@ -144,18 +145,37 @@
     }
 
     @Override
-    public int getColumnOccupiedSpace(boolean includeCurrentTupleColumns) {
-        int presentColumns = transformerForCurrentTuple.getNumberOfVisitedColumnsInBatch();
+    public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, boolean includeCurrentTupleColumns,
+            boolean adaptive) {
         int totalNumberOfColumns = getAbsoluteNumberOfColumns(includeCurrentTupleColumns);
+        totalNumberOfColumns = Math.min(totalNumberOfColumns, maxColumnsInPageZerothSegment);
+
+        int spaceOccupiedByDefaultWriter = DefaultColumnMultiPageZeroWriter.EXTENDED_HEADER_SIZE + totalNumberOfColumns
+                * (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+
+        if (!adaptive) {
+            // go for default multipage writer
+            return spaceOccupiedByDefaultWriter;
+        }
+
+        // Maximum space occupied by the columns = maxColumnsInPageZerothSegment * (offset + filter size)
+        int spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
+        pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
+                spaceOccupiedBySparseWriter, adaptive);
+
+        return Math.min(spaceOccupiedBySparseWriter, spaceOccupiedByDefaultWriter);
+    }
+
+    private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment) {
+        int presentColumns = transformerForCurrentTuple.getNumberOfVisitedColumnsInBatch();
+        int numberOfPagesRequired = (int) Math.ceil(
+                (double) (presentColumns - maxColumnsInPageZerothSegment) / IColumnPageZeroWriter.MIN_COLUMN_SPACE);
+        int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfPagesRequired);
+        presentColumns = Math.min(presentColumns, maxColumnsInPageZerothSegment);
 
         // space occupied by the sparse writer
-        int spaceOccupiedBySparseWriter = presentColumns
+        return headerSpace + presentColumns
                 * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
-        int spaceOccupiedByDefaultWriter = totalNumberOfColumns
-                * (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
-        pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
-                spaceOccupiedBySparseWriter);
-        return Math.min(spaceOccupiedBySparseWriter, spaceOccupiedByDefaultWriter);
     }
 
     @Override
@@ -193,10 +213,9 @@
     }
 
     @Override
-    public final int flush(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter) throws HyracksDataException {
+    public final int flush(IColumnPageZeroWriter pageZeroWriter) throws HyracksDataException {
         int numberOfColumns = getAbsoluteNumberOfColumns(false);
         finalizer.finalizeBatchColumns(columnMetadata, presentColumnsIndexes, pageZeroWriter);
-        writer.setPageZeroWriter(pageZero, pageZeroWriter, toIndexArray(presentColumnsIndexes), numberOfColumns);
 
         //assertion logging
         int presentColumnsCount = presentColumnsIndexes.cardinality();
@@ -207,6 +226,7 @@
                     beforeTransformColumnCount, currentTupleColumnsCount, presentColumnsCount);
         }
 
+        writer.setPageZeroWriter(pageZeroWriter, toIndexArray(presentColumnsIndexes), numberOfColumns);
         return finalizer.finalizeBatch(writer);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index 466a0cf..293f764 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.column.operation.lsm.merge;
 
-import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleWriter;
 import org.apache.asterix.column.tuple.MergeColumnTupleReference;
 import org.apache.asterix.column.util.RunLengthIntArray;
 import org.apache.asterix.column.values.IColumnValuesReader;
@@ -32,6 +32,8 @@
 import org.apache.asterix.column.zero.PageZeroWriterFlavorSelector;
 import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
 import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.SparseColumnMultiPageZeroWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
@@ -44,8 +46,6 @@
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-
 public class MergeColumnTupleWriter extends AbstractColumnTupleWriter {
     private final MergeColumnWriteMetadata columnMetadata;
     private final int maxLeafNodeSize;
@@ -56,7 +56,7 @@
     private final PriorityQueue<IColumnValuesWriter> orderedColumns;
     private final ColumnBatchWriter writer;
     private final IColumnPageZeroWriterFlavorSelector pageZeroWriterFlavorSelector;
-    protected final IntOpenHashSet presentColumnsIndexes;
+    protected final BitSet presentColumnsIndexes;
     private final int maxNumberOfTuples;
     private int primaryKeysEstimatedSize;
     private int numberOfAntiMatter;
@@ -67,7 +67,7 @@
         this.columnMetadata = columnMetadata;
         this.pageZeroWriterFlavorSelector = new PageZeroWriterFlavorSelector();
         this.maxLeafNodeSize = maxLeafNodeSize;
-        this.presentColumnsIndexes = new IntOpenHashSet();
+        this.presentColumnsIndexes = new BitSet();
         List<IColumnTupleIterator> componentsTuplesList = columnMetadata.getComponentsTuples();
         this.componentsTuples = new MergeColumnTupleReference[componentsTuplesList.size()];
         int totalLength = 0;
@@ -155,52 +155,78 @@
     }
 
     @Override
-    public int getColumnOccupiedSpace(boolean includeCurrentTupleColumns) {
-        int presentColumns = presentColumnsIndexes.size();
+    public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, boolean includeCurrentTupleColumns,
+            boolean adaptive) {
         int totalNumberOfColumns = getAbsoluteNumberOfColumns(includeCurrentTupleColumns);
+        totalNumberOfColumns = Math.min(totalNumberOfColumns, maxColumnsInPageZerothSegment);
+
+        int spaceOccupiedByDefaultWriter = DefaultColumnMultiPageZeroWriter.EXTENDED_HEADER_SIZE + totalNumberOfColumns
+                * (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+
+        if (!adaptive) {
+            // go for default multipage writer
+            return spaceOccupiedByDefaultWriter;
+        }
 
         // space occupied by the sparse writer
-        int spaceOccupiedBySparseWriter = presentColumns
-                * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
-        int spaceOccupiedByDefaultWriter = totalNumberOfColumns
-                * (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        int spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
         pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
-                spaceOccupiedBySparseWriter);
+                spaceOccupiedBySparseWriter, adaptive);
         return Math.min(spaceOccupiedBySparseWriter, spaceOccupiedByDefaultWriter);
     }
 
+    private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment) {
+        int presentColumns = presentColumnsIndexes.cardinality();
+        int numberOfPagesRequired = (int) Math.ceil(
+                (double) (presentColumns - maxColumnsInPageZerothSegment) / IColumnPageZeroWriter.MIN_COLUMN_SPACE);
+        int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfPagesRequired);
+        presentColumns = Math.min(presentColumns, maxColumnsInPageZerothSegment);
+
+        // space occupied by the sparse writer
+        return headerSpace + presentColumns
+                * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+    }
+
     @Override
-    public int flush(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter) throws HyracksDataException {
+    public int flush(IColumnPageZeroWriter pageZeroWriter) throws HyracksDataException {
         // here the numberOfColumns is the total number of columns present in the LSM Index (across all disk components)
         // Hence, a merge will fail if union(NumberOfColumns(D1) + NumberOfColumns(D2) + ... + NumberOfColumns(DN)) >
         // pageZero space, and since the merged page contains this many number of columns, the first flush will fail.
         int numberOfColumns = columnMetadata.getNumberOfColumns();
         int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
+
+        // If writtenComponents is not empty, process non-key columns
         if (writtenComponents.getSize() > 0) {
             writeNonKeyColumns();
             writtenComponents.reset();
         }
 
-        for (int columnIndex : presentColumnsIndexes) {
+        // Iterate over the BitSet (presentColumnsIndexes) to get the indexes of the set bits
+        for (int columnIndex = presentColumnsIndexes.nextSetBit(0); columnIndex >= 0; columnIndex =
+                presentColumnsIndexes.nextSetBit(columnIndex + 1)) {
             if (columnIndex < numberOfPrimaryKeys) {
-                continue;
+                continue; // Skip primary key columns
             }
             orderedColumns.add(columnMetadata.getWriter(columnIndex));
         }
 
-        // reset pageZeroWriter based on the writer
-        writer.setPageZeroWriter(pageZero, pageZeroWriter, getPresentColumnsIndexesArray(), numberOfColumns);
+        // Reset pageZeroWriter based on the writer
+        writer.setPageZeroWriter(pageZeroWriter, toIndexArray(presentColumnsIndexes), numberOfColumns);
+
+        // Write primary key columns
         writer.writePrimaryKeyColumns(primaryKeyWriters);
+
+        // Write the other columns and get the total length
         int totalLength = writer.writeColumns(orderedColumns);
 
+        // Reset numberOfAntiMatter (assuming this is part of the logic)
         numberOfAntiMatter = 0;
+
         return totalLength;
     }
 
-    public int[] getPresentColumnsIndexesArray() {
-        int[] sortedIndexes = presentColumnsIndexes.toIntArray();
-        Arrays.sort(sortedIndexes);
-        return sortedIndexes;
+    public static int[] toIndexArray(BitSet bitSet) {
+        return FlushColumnTupleWriter.toIndexArray(bitSet);
     }
 
     @Override
@@ -230,14 +256,17 @@
         for (int i = 0; i < writtenComponents.getNumberOfBlocks(); i++) {
             int componentIndex = writtenComponents.getBlockValue(i);
             if (componentIndex < 0) {
-                //Skip writing values of deleted tuples
+                // Skip writing values of deleted tuples
                 componentIndex = clearAntimatterIndicator(componentIndex);
                 skipReaders(componentIndex, writtenComponents.getBlockSize(i));
                 continue;
             }
             MergeColumnTupleReference componentTuple = componentsTuples[componentIndex];
             int count = writtenComponents.getBlockSize(i);
-            for (int columnIndex : presentColumnsIndexes) {
+
+            // Iterate over the set bits in presentColumnsIndexes
+            for (int columnIndex = presentColumnsIndexes.nextSetBit(0); columnIndex >= 0; columnIndex =
+                    presentColumnsIndexes.nextSetBit(columnIndex + 1)) {
                 if (columnIndex < columnMetadata.getNumberOfPrimaryKeys()) {
                     continue;
                 }
@@ -263,7 +292,9 @@
     private void skipReaders(int componentIndex, int count) throws HyracksDataException {
         MergeColumnTupleReference componentTuple = componentsTuples[componentIndex];
         try {
-            for (int columnIndex : presentColumnsIndexes) {
+            // Iterate over the set bits in presentColumnsIndexes
+            for (int columnIndex = presentColumnsIndexes.nextSetBit(0); columnIndex >= 0; columnIndex =
+                    presentColumnsIndexes.nextSetBit(columnIndex + 1)) {
                 if (columnIndex < columnMetadata.getNumberOfPrimaryKeys()) {
                     continue;
                 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
index 0c9c816..eb12a27 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.tuple;
 
+import java.util.BitSet;
+
 import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
 import org.apache.asterix.column.operation.lsm.merge.IEndOfPageCallBack;
 import org.apache.asterix.column.operation.lsm.merge.MergeColumnReadMetadata;
@@ -29,15 +31,13 @@
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
 
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-
 public final class MergeColumnTupleReference extends AbstractAsterixColumnTupleReference {
     // NoOP callback is for empty pages only
     private static final IEndOfPageCallBack EMPTY_PAGE_CALLBACK = createNoOpCallBack();
     private final IColumnValuesReader[] columnReaders;
     private int skipCount;
     private IEndOfPageCallBack endOfPageCallBack;
-    private IntOpenHashSet presentColumnIndexes;
+    private BitSet presentColumnIndexes;
     private int mergingLength;
 
     public MergeColumnTupleReference(int componentIndex, ColumnBTreeReadLeafFrame frame,
@@ -155,7 +155,7 @@
         };
     }
 
-    public void setColumnIndexes(IntOpenHashSet presentColumnsIndexes) {
+    public void setColumnIndexes(BitSet presentColumnsIndexes) {
         this.presentColumnIndexes = presentColumnsIndexes;
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
index 45d0008..ade1d38 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.column.values;
 
-import java.nio.ByteBuffer;
 import java.util.PriorityQueue;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -41,13 +40,12 @@
      * abstraction that supports different page zero layouts (default vs sparse).
      * The writer will be used to manage column offsets, filters, and primary key storage.
      * 
-     * @param pageZero The page zero buffer where metadata will be written
      * @param pageZeroWriter The writer implementation for page zero operations
      * @param presentColumnsIndexes Array of column indexes that contain data in this batch
      * @param numberOfColumns Total number of columns in the schema
      */
-    void setPageZeroWriter(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter, int[] presentColumnsIndexes,
-            int numberOfColumns);
+    void setPageZeroWriter(IColumnPageZeroWriter pageZeroWriter, int[] presentColumnsIndexes, int numberOfColumns)
+            throws HyracksDataException;
 
     /**
      * Writes the primary keys' values to Page0
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
index 1a767a9..e1bf8e3 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.column.values.writer;
 
-import java.nio.ByteBuffer;
 import java.util.PriorityQueue;
 
 import org.apache.asterix.column.bytes.stream.out.MultiPersistentBufferBytesOutputStream;
@@ -61,17 +60,16 @@
      * This method replaces the direct page zero buffer manipulation with a more abstracted approach,
      * which allows for different page zero layouts (default or sparse).
      *
-     * @param pageZero The page zero buffer to be used
      * @param pageZeroWriter The writer implementation for page zero operations
      * @param presentColumnsIndexes Array containing the indexes of columns present in this batch
      * @param numberOfColumns Total number of columns in the schema
      */
-    public void setPageZeroWriter(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter,
-            int[] presentColumnsIndexes, int numberOfColumns) {
+    public void setPageZeroWriter(IColumnPageZeroWriter pageZeroWriter, int[] presentColumnsIndexes,
+            int numberOfColumns) throws HyracksDataException {
         this.pageZeroWriter = pageZeroWriter;
-        pageZeroWriter.reset(pageZero, presentColumnsIndexes, numberOfColumns);
+        pageZeroWriter.resetBasedOnColumns(presentColumnsIndexes, numberOfColumns);
         pageZeroWriter.allocateColumns();
-        nonKeyColumnStartOffset = pageZeroWriter.getPageZeroBuffer().capacity();
+        nonKeyColumnStartOffset = pageZeroWriter.getPageZeroBufferCapacity();
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
index da7c9f2..c4f9bd5 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
@@ -19,12 +19,19 @@
 package org.apache.asterix.column.zero;
 
 import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter.DEFAULT_WRITER_FLAG;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter.MULTI_PAGE_DEFAULT_WRITER_FLAG;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter.MULTI_PAGE_SPARSE_WRITER_FLAG;
 import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter.SPARSE_WRITER_FLAG;
 
 import org.apache.asterix.column.zero.readers.DefaultColumnPageZeroReader;
 import org.apache.asterix.column.zero.readers.SparseColumnPageZeroReader;
 import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
 import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroReader;
+import org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.SparseColumnMultiPageZeroReader;
+import org.apache.asterix.column.zero.writers.multipage.SparseColumnMultiPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroReader;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriterFlavorSelector;
@@ -43,7 +50,7 @@
  */
 public class PageZeroWriterFlavorSelector implements IColumnPageZeroWriterFlavorSelector {
     // Flag indicating which writer type is currently selected (DEFAULT_WRITER_FLAG=default, SPARSE_WRITER_FLAG=sparse)
-    protected byte writerFlag = DEFAULT_WRITER_FLAG;
+    protected byte writerFlag = MULTI_PAGE_DEFAULT_WRITER_FLAG;
 
     // Cache of writer instances to avoid repeated object creation
     private final Byte2ObjectArrayMap<IColumnPageZeroWriter> writers;
@@ -65,13 +72,19 @@
      * @param spaceOccupiedBySparseWriter Space in bytes required by the sparse writer
      */
     @Override
-    public void switchPageZeroWriterIfNeeded(int spaceOccupiedByDefaultWriter, int spaceOccupiedBySparseWriter) {
+    public void switchPageZeroWriterIfNeeded(int spaceOccupiedByDefaultWriter, int spaceOccupiedBySparseWriter,
+            boolean adaptive) {
+        if (!adaptive) {
+            // If not adaptive, always use the default writer
+            writerFlag = MULTI_PAGE_DEFAULT_WRITER_FLAG;
+            return;
+        }
         if (spaceOccupiedByDefaultWriter <= spaceOccupiedBySparseWriter) {
             // Default writer is more space-efficient (or equal), use it
-            writerFlag = DEFAULT_WRITER_FLAG;
+            writerFlag = MULTI_PAGE_DEFAULT_WRITER_FLAG;
         } else {
             // Sparse writer is more space-efficient, use it
-            writerFlag = SPARSE_WRITER_FLAG;
+            writerFlag = MULTI_PAGE_SPARSE_WRITER_FLAG;
         }
     }
 
@@ -83,12 +96,15 @@
      * @throws IllegalStateException if an unsupported writer flag is encountered
      */
     @Override
-    public IColumnPageZeroWriter getPageZeroWriter() {
-       return switch (writerFlag) {
-           case DEFAULT_WRITER_FLAG -> writers.computeIfAbsent(DEFAULT_WRITER_FLAG, k -> new DefaultColumnPageZeroWriter());
-           case SPARSE_WRITER_FLAG -> writers.computeIfAbsent(SPARSE_WRITER_FLAG, k -> new SparseColumnPageZeroWriter());
-           default -> throw new IllegalStateException("Unsupported writer flag: " + writerFlag);
-       };
+    public IColumnPageZeroWriter getPageZeroWriter(IColumnWriteMultiPageOp multiPageOpRef, int zerothSegmentMaxColumns,
+            int bufferCapacity) {
+               return switch (writerFlag) {
+                   case DEFAULT_WRITER_FLAG -> writers.computeIfAbsent(DEFAULT_WRITER_FLAG, k -> new DefaultColumnPageZeroWriter());
+                   case SPARSE_WRITER_FLAG -> writers.computeIfAbsent(SPARSE_WRITER_FLAG, k -> new SparseColumnPageZeroWriter());
+                   case MULTI_PAGE_DEFAULT_WRITER_FLAG -> writers.computeIfAbsent(MULTI_PAGE_DEFAULT_WRITER_FLAG, k -> new DefaultColumnMultiPageZeroWriter(multiPageOpRef, zerothSegmentMaxColumns, bufferCapacity));
+                   case MULTI_PAGE_SPARSE_WRITER_FLAG -> writers.computeIfAbsent(MULTI_PAGE_SPARSE_WRITER_FLAG, k -> new SparseColumnMultiPageZeroWriter(multiPageOpRef, zerothSegmentMaxColumns, bufferCapacity));
+                   default -> throw new IllegalStateException("Unsupported writer flag: " + writerFlag);
+               };
     }
 
     /**
@@ -101,11 +117,13 @@
      * @throws IllegalStateException if an unsupported reader flag is encountered
      */
     @Override
-    public IColumnPageZeroReader createPageZeroReader(byte flag) {
-        return switch (flag) {
-            case DEFAULT_WRITER_FLAG -> readers.computeIfAbsent(DEFAULT_WRITER_FLAG, k -> new DefaultColumnPageZeroReader());
-            case SPARSE_WRITER_FLAG -> readers.computeIfAbsent(SPARSE_WRITER_FLAG, k -> new SparseColumnPageZeroReader());
-            default -> throw new IllegalStateException("Unsupported reader flag: " + flag);
-        };
+    public IColumnPageZeroReader createPageZeroReader(byte flag, int bufferCapacity) {
+                return switch (flag) {
+                    case DEFAULT_WRITER_FLAG -> readers.computeIfAbsent(DEFAULT_WRITER_FLAG, k -> new DefaultColumnPageZeroReader());
+                    case SPARSE_WRITER_FLAG -> readers.computeIfAbsent(SPARSE_WRITER_FLAG, k -> new SparseColumnPageZeroReader());
+                    case MULTI_PAGE_DEFAULT_WRITER_FLAG -> readers.computeIfAbsent(MULTI_PAGE_DEFAULT_WRITER_FLAG, k -> new DefaultColumnMultiPageZeroReader(bufferCapacity));
+                    case MULTI_PAGE_SPARSE_WRITER_FLAG -> readers.computeIfAbsent(MULTI_PAGE_SPARSE_WRITER_FLAG, k -> new SparseColumnMultiPageZeroReader(bufferCapacity));
+                    default -> throw new IllegalStateException("Unsupported reader flag: " + flag);
+                };
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
index b643fd6..1692e70 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.column.zero.readers;
 
-import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
 import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
 import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
 import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NEXT_LEAF_OFFSET;
@@ -27,48 +26,67 @@
 import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
 
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 
 import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroReader;
 
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-
 public class DefaultColumnPageZeroReader implements IColumnPageZeroReader {
     protected ByteBuffer pageZeroBuf;
+    protected BitSet pageZeroSegmentsPages;
+    protected int numberOfPresentColumns;
+    protected int headerSize;
+
+    public DefaultColumnPageZeroReader() {
+        this.pageZeroSegmentsPages = new BitSet();
+    }
 
     @Override
-    public void reset(ByteBuffer pageZeroBuf) {
+    public void reset(ByteBuffer pageZeroBuf, int headerSize) {
         this.pageZeroBuf = pageZeroBuf;
+        this.numberOfPresentColumns = pageZeroBuf.getInt(NUMBER_OF_COLUMNS_OFFSET);
+        this.headerSize = headerSize;
+    }
+
+    public void reset(ByteBuffer pageZeroBuf, int numberOfPresentColumns, int headerSize) {
+        this.pageZeroBuf = pageZeroBuf;
+        this.numberOfPresentColumns = numberOfPresentColumns;
+        this.headerSize = headerSize;
     }
 
     @Override
     public int getColumnOffset(int columnIndex) {
-        return pageZeroBuf.getInt(HEADER_SIZE + columnIndex * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE);
+        return pageZeroBuf.getInt(headerSize + columnIndex * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE);
     }
 
-    @Override
-    public int getColumnFilterOffset(int columnIndex) {
-        int columnsOffsetEnd =
-                HEADER_SIZE + getNumberOfPresentColumns() * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+    protected int getColumnFilterOffset(int columnIndex) {
+        int columnsOffsetEnd = headerSize + numberOfPresentColumns * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         return columnsOffsetEnd + columnIndex * DefaultColumnPageZeroWriter.FILTER_SIZE;
     }
 
     @Override
-    public long getLong(int offset) {
-        return pageZeroBuf.getLong(offset);
+    public long getColumnFilterMin(int columnIndex) {
+        int filterOffset = getColumnFilterOffset(columnIndex);
+        return pageZeroBuf.getLong(filterOffset);
+    }
+
+    @Override
+    public long getColumnFilterMax(int columnIndex) {
+        int filterOffset = getColumnFilterOffset(columnIndex);
+        return pageZeroBuf.getLong(filterOffset + Long.BYTES);
     }
 
     @Override
     public void skipFilters() {
-        int filterEndOffset = getColumnFilterOffset(getNumberOfPresentColumns());
+        int filterEndOffset = getColumnFilterOffset(numberOfPresentColumns);
         pageZeroBuf.position(filterEndOffset);
     }
 
     @Override
     public void skipColumnOffsets() {
-        int columnEndOffset =
-                HEADER_SIZE + getNumberOfPresentColumns() * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        int columnEndOffset = headerSize + numberOfPresentColumns * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         pageZeroBuf.position(columnEndOffset);
     }
 
@@ -78,6 +96,11 @@
     }
 
     @Override
+    public int getNumberOfPageZeroSegments() {
+        return 1;
+    }
+
+    @Override
     public int getLeftMostKeyOffset() {
         return pageZeroBuf.getInt(LEFT_MOST_KEY_OFFSET);
     }
@@ -114,16 +137,13 @@
 
     @Override
     public boolean isValidColumn(int columnIndex) {
-        int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
-        return relativeColumnIndex < getNumberOfPresentColumns();
+        return columnIndex < numberOfPresentColumns;
     }
 
     @Override
-    public void getAllColumns(IntOpenHashSet presentColumns) {
-        int numberOfColumns = getNumberOfPresentColumns();
-        for (int i = 0; i < numberOfColumns; i++) {
-            presentColumns.add(i);
-        }
+    public void getAllColumns(BitSet presentColumns) {
+        int numberOfColumns = numberOfPresentColumns;
+        presentColumns.set(0, numberOfColumns);
     }
 
     @Override
@@ -133,11 +153,33 @@
 
     @Override
     public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
-        int columnOffsetStart = HEADER_SIZE;
+        int columnOffsetStart = headerSize;
         for (int i = 0; i < offsetColumnIndexPairs.length; i++) {
             int offset = pageZeroBuf.getInt(columnOffsetStart);
             offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
             columnOffsetStart += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         }
     }
+
+    @Override
+    public BitSet getPageZeroSegmentsPages() {
+        return pageZeroSegmentsPages;
+    }
+
+    @Override
+    public int getHeaderSize() {
+        return headerSize;
+    }
+
+    @Override
+    public void resetStream(IColumnBufferProvider pageZeroSegmentBufferProvider) {
+        throw new UnsupportedOperationException("Not supported for DefaultColumnPageZeroReader");
+    }
+
+    @Override
+    public BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll) {
+        pageZeroSegmentsPages.clear();
+        pageZeroSegmentsPages.set(0);
+        return pageZeroSegmentsPages;
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
index 44d2369..3b4fdc4 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
@@ -18,16 +18,14 @@
  */
 package org.apache.asterix.column.zero.readers;
 
-import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
-
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 
 import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
 import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
 
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 
 public class SparseColumnPageZeroReader extends DefaultColumnPageZeroReader {
     private final Int2IntOpenHashMap columnIndexToRelativeColumnIndex;
@@ -38,8 +36,8 @@
     }
 
     @Override
-    public void reset(ByteBuffer pageZeroBuf) {
-        super.reset(pageZeroBuf);
+    public void reset(ByteBuffer pageZeroBuf, int headerSize) {
+        super.reset(pageZeroBuf, headerSize);
         columnIndexToRelativeColumnIndex.clear();
     }
 
@@ -47,28 +45,26 @@
     public int getColumnOffset(int columnIndex) {
         int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
         return pageZeroBuf.getInt(
-                HEADER_SIZE + relativeColumnIndex * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + Integer.BYTES);
+                headerSize + relativeColumnIndex * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + Integer.BYTES);
     }
 
     @Override
     public int getColumnFilterOffset(int columnIndex) {
         int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
-        int columnsOffsetEnd =
-                HEADER_SIZE + getNumberOfPresentColumns() * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        int columnsOffsetEnd = headerSize + numberOfPresentColumns * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         return columnsOffsetEnd + relativeColumnIndex * DefaultColumnPageZeroWriter.FILTER_SIZE;
     }
 
     @Override
     public void skipFilters() {
-        int filterEndOffset = HEADER_SIZE + getNumberOfPresentColumns()
-                * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        int filterEndOffset = headerSize + numberOfPresentColumns
+                * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
         pageZeroBuf.position(filterEndOffset);
     }
 
     @Override
     public void skipColumnOffsets() {
-        int columnsOffsetEnd =
-                HEADER_SIZE + getNumberOfPresentColumns() * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        int columnsOffsetEnd = headerSize + numberOfPresentColumns * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         pageZeroBuf.position(columnsOffsetEnd);
     }
 
@@ -87,7 +83,7 @@
             return 0;
         }
 
-        int totalColumns = getNumberOfPresentColumns();
+        int totalColumns = numberOfPresentColumns;
         int lastColumnIndex = getColumnIndex(totalColumns - 1);
         int lastColumn = pageZeroBuf.getInt(lastColumnIndex);
         if (lastColumn == columnIndex) {
@@ -115,7 +111,7 @@
     }
 
     private int getColumnIndex(int index) {
-        return HEADER_SIZE + index * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        return headerSize + index * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
     }
 
     @Override
@@ -125,11 +121,17 @@
     }
 
     @Override
-    public void getAllColumns(IntOpenHashSet presentColumns) {
-        int columnIndex = getColumnIndex(0);
-        for (int i = 0; i < getNumberOfPresentColumns(); i++) {
+    public void getAllColumns(BitSet presentColumns) {
+        if (numberOfPresentColumns == 0) {
+            return;
+        }
+
+        int columnIndex = headerSize;
+        int limit = columnIndex + numberOfPresentColumns * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+
+        while (columnIndex < limit) {
             int column = pageZeroBuf.getInt(columnIndex);
-            presentColumns.add(column);
+            presentColumns.set(column);
             columnIndex += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         }
     }
@@ -137,7 +139,7 @@
     @Override
     public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
         int columnIndex = getColumnIndex(0);
-        for (int i = 0; i < getNumberOfPresentColumns(); i++) {
+        for (int i = 0; i < numberOfPresentColumns; i++) {
             int column = pageZeroBuf.getInt(columnIndex);
             int offset = pageZeroBuf.getInt(columnIndex + SparseColumnPageZeroWriter.COLUMN_INDEX_SIZE);
             offsetColumnIndexPairs[i] = IntPairUtil.of(offset, column);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/DefaultColumnPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/DefaultColumnPageZeroWriter.java
index 3d7aa93..e7d4e66 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/DefaultColumnPageZeroWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/DefaultColumnPageZeroWriter.java
@@ -18,21 +18,33 @@
  */
 package org.apache.asterix.column.zero.writers;
 
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.FLAG_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.RIGHT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.SIZE_OF_COLUMNS_OFFSETS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
+
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 
 import org.apache.asterix.column.bytes.stream.out.ByteBufferOutputStream;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IValuesWriter;
 
 /**
  * Default implementation of page zero writer that allocates space for all columns in the schema.
- * 
+ * <p>
  * This writer uses a fixed layout where every column in the schema has a reserved slot,
  * regardless of whether data is present for that column. This approach is optimal for
  * dense datasets where most columns contain data.
- * 
+ * <p>
  * Memory layout in page zero:
  * 1. Column offsets: 4 bytes per column (numberOfColumns * 4 bytes)
  * 2. Column filters: 16 bytes per column (numberOfColumns * 16 bytes) - min/max values
@@ -44,24 +56,26 @@
     /** Size in bytes for storing column filter (min + max values) */
     public static final int FILTER_SIZE = Long.BYTES * 2; // min and max
 
-    private final ByteBufferOutputStream primaryKeys;
-    private ByteBuffer pageZero;
+    protected final ByteBufferOutputStream primaryKeys;
+    protected ByteBuffer pageZero;
+    protected int headerSize;
     private int numberOfColumns;
 
     // Offset positions within page zero buffer
-    private int primaryKeysOffset; // Where primary key data starts
-    private int columnsOffset; // Where column offset array starts  
-    private int filtersOffset; // Where column filter array starts
+    protected int primaryKeysOffset; // Where primary key data starts
+    protected int columnsOffset; // Where column offset array starts
+    protected int filtersOffset; // Where column filter array starts
 
     public DefaultColumnPageZeroWriter() {
         primaryKeys = new ByteBufferOutputStream();
     }
 
     @Override
-    public void reset(ByteBuffer pageZeroBuf, int[] presentColumns, int numberOfColumns) {
-        this.pageZero = pageZeroBuf;
+    public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns, int headerSize) {
         this.numberOfColumns = numberOfColumns;
-        this.primaryKeysOffset = pageZeroBuf.position();
+        primaryKeysOffset = headerSize;
+        this.headerSize = headerSize;
+        pageZero.position(headerSize);
     }
 
     @Override
@@ -71,7 +85,7 @@
 
     /**
      * Allocates space in page zero for all column metadata.
-     * 
+     * <p>
      * The allocation strategy reserves space for all columns in the schema:
      * - Column offsets: Fixed array of 4-byte integers
      * - Column filters: Fixed array of 16-byte min/max pairs
@@ -140,6 +154,48 @@
     }
 
     @Override
+    public void setPageZero(ByteBuffer pageZero) {
+        // this method is used to set the pageZero buffer
+        // only caller is the MultiColumnPageZeroWriter
+        this.pageZero = pageZero;
+    }
+
+    public void flush(ByteBuffer buf, int numberOfTuples, ITupleReference minKey, ITupleReference maxKey,
+            AbstractColumnTupleWriter columnWriter, ITreeIndexTupleWriter rowTupleWriter) throws HyracksDataException {
+        this.pageZero = buf;
+        // Prepare the space for writing the columns' information such as the primary keys
+        pageZero.position(HEADER_SIZE);
+        this.primaryKeysOffset = buf.position();
+        // Flush the columns to persistence pages and write the length of the mega leaf node in pageZero
+        pageZero.putInt(MEGA_LEAF_NODE_LENGTH, columnWriter.flush(this));
+        // Write min and max keys
+        int offset = buf.position();
+        buf.putInt(LEFT_MOST_KEY_OFFSET, offset);
+        offset += rowTupleWriter.writeTuple(minKey, buf.array(), offset);
+        buf.putInt(RIGHT_MOST_KEY_OFFSET, offset);
+        rowTupleWriter.writeTuple(maxKey, buf.array(), offset);
+
+        // Write page information
+        buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
+        buf.put(FLAG_OFFSET, flagCode());
+        buf.putInt(NUMBER_OF_COLUMNS_OFFSET, getNumberOfColumns());
+        buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, getColumnOffsetsSize());
+
+        // reset the collected meta info
+        columnWriter.reset();
+    }
+
+    public void flush(ByteBuffer buf, int numberOfTuples, AbstractColumnTupleWriter writer)
+            throws HyracksDataException {
+        this.pageZero = buf;
+        pageZero.position(HEADER_SIZE);
+        this.primaryKeysOffset = buf.position();
+        pageZero.putInt(MEGA_LEAF_NODE_LENGTH, writer.flush(this));
+        buf.putInt(NUMBER_OF_COLUMNS_OFFSET, getNumberOfColumns());
+        buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
+    }
+
+    @Override
     public int getNumberOfColumns() {
         return numberOfColumns;
     }
@@ -158,8 +214,8 @@
     }
 
     @Override
-    public ByteBuffer getPageZeroBuffer() {
-        return pageZero;
+    public int getPageZeroBufferCapacity() {
+        return pageZero.capacity();
     }
 
     /**
@@ -177,4 +233,9 @@
     public int getColumnOffsetsSize() {
         return numberOfColumns * COLUMN_OFFSET_SIZE;
     }
+
+    @Override
+    public int getHeaderSize() {
+        return headerSize;
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/SparseColumnPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/SparseColumnPageZeroWriter.java
index f74b575..4fc2550 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/SparseColumnPageZeroWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/SparseColumnPageZeroWriter.java
@@ -20,55 +20,53 @@
 
 import static org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter.FILTER_SIZE;
 
-import java.nio.ByteBuffer;
 import java.util.BitSet;
 
-import org.apache.asterix.column.bytes.stream.out.ByteBufferOutputStream;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IValuesWriter;
 
 /**
  * Sparse implementation of page zero writer that only allocates space for present columns.
- * 
+ * <p>
  * This writer optimizes space usage for sparse datasets by storing only the columns
  * that actually contain data. Each column entry includes both the column index and
  * its data offset, allowing for efficient lookup while minimizing space overhead.
- *
+ *<p>
  * Memory layout in page zero:
  * 1. Column entries: 8 bytes per present column (4 bytes index + 4 bytes offset)
  * 2. Column filters: 16 bytes per present column (min/max values)
  * 3. Primary key data: variable size, written sequentially
- *
+ * <p>
  * This layout is particularly beneficial when the number of present columns is
  * significantly smaller than the total schema size.
  */
-public class SparseColumnPageZeroWriter implements IColumnPageZeroWriter {
+public class SparseColumnPageZeroWriter extends DefaultColumnPageZeroWriter {
     /** Size in bytes for storing a column index */
     public static final int COLUMN_INDEX_SIZE = Integer.BYTES;
     /** Size in bytes for storing a column entry (index + offset) */
     public static final int COLUMN_OFFSET_SIZE = Integer.BYTES + COLUMN_INDEX_SIZE;
 
-    private final ByteBufferOutputStream primaryKeys;
     private int[] presentColumns;
     private int numberOfPresentColumns;
-    private ByteBuffer pageZero;
-
-    // Offset positions within page zero buffer
-    private int primaryKeysOffset; // Where primary key data starts
-    private int columnsOffset; // Where column entries array starts
-    private int filtersOffset; // Where column filter array starts
 
     public SparseColumnPageZeroWriter() {
-        primaryKeys = new ByteBufferOutputStream();
+        super();
     }
 
     @Override
-    public void reset(ByteBuffer pageZeroBuf, int[] presentColumns, int numberOfColumns /* not being used */) {
-        this.pageZero = pageZeroBuf;
+    public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns /* not being used */, int headerSize) {
         this.presentColumns = presentColumns;
         this.numberOfPresentColumns = presentColumns.length;
-        this.primaryKeysOffset = pageZeroBuf.position();
+        this.primaryKeysOffset = headerSize;
+        this.headerSize = headerSize;
+        pageZero.position(headerSize);
+    }
+
+    public void resetInnerBasedOnColumns(int[] presentColumns, int numberOfPresentColumns, int headerSize) {
+        this.presentColumns = presentColumns;
+        this.numberOfPresentColumns = numberOfPresentColumns;
+        this.primaryKeysOffset = headerSize; // Reset primary keys offset for sparse layout
+        pageZero.position(headerSize);
     }
 
     @Override
@@ -159,9 +157,9 @@
      * @param columnIndex The absolute column index to find
      * @return the relative position within present columns, or -1 if not found
      */
-    private int findColumnIndex(int columnIndex) {
+    public int findColumnIndex(int[] presentColumns, int numberOfPresentColumns, int columnIndex) {
         int low = 0;
-        int high = presentColumns.length - 1;
+        int high = numberOfPresentColumns - 1;
         while (low <= high) {
             int mid = (low + high) >>> 1;
             int midVal = presentColumns[mid];
@@ -194,13 +192,8 @@
     }
 
     @Override
-    public ByteBuffer getPageZeroBuffer() {
-        return pageZero;
-    }
-
-    @Override
     public int getNumberOfColumns() {
-        return presentColumns.length;
+        return numberOfPresentColumns;
     }
 
     /**
@@ -215,7 +208,7 @@
      */
     @Override
     public int getRelativeColumnIndex(int columnIndex) {
-        int columnRelativeIndex = findColumnIndex(columnIndex);
+        int columnRelativeIndex = findColumnIndex(presentColumns, numberOfPresentColumns, columnIndex);
         if (columnRelativeIndex == -1) {
             throw new IllegalStateException("Column index " + columnIndex + " does not exist in present columns.");
         }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/AbstractColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/AbstractColumnMultiPageZeroReader.java
new file mode 100644
index 0000000..3b05fc3
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/AbstractColumnMultiPageZeroReader.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers.multipage;
+
+import org.apache.asterix.column.bytes.stream.in.MultiPageZeroByteBuffersReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroReader;
+
+public abstract class AbstractColumnMultiPageZeroReader implements IColumnPageZeroReader {
+    protected MultiPageZeroByteBuffersReader segmentBuffers;
+
+    AbstractColumnMultiPageZeroReader() {
+        segmentBuffers = new MultiPageZeroByteBuffersReader();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
new file mode 100644
index 0000000..db800ae
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers.multipage;
+
+import static org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter.EXTENDED_HEADER_SIZE;
+import static org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter.MAX_COLUMNS_IN_ZEROTH_SEGMENT;
+import static org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter.NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NEXT_LEAF_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.RIGHT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.asterix.column.zero.readers.DefaultColumnPageZeroReader;
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+
+public class DefaultColumnMultiPageZeroReader extends AbstractColumnMultiPageZeroReader {
+    public static final int headerSize = DefaultColumnMultiPageZeroWriter.EXTENDED_HEADER_SIZE;
+    private final DefaultColumnPageZeroReader zerothSegmentReader;
+
+    private final int maxNumberOfColumnsInAPage;
+    private final BitSet pageZeroSegmentsPages;
+    private int zerothSegmentMaxColumns;
+    private int numberOfPageZeroSegments; // includes the zeroth segment
+    private ByteBuffer pageZeroBuf;
+
+    private final VoidPointable offsetPointable;
+
+    public DefaultColumnMultiPageZeroReader(int bufferCapacity) {
+        super();
+        zerothSegmentReader = new DefaultColumnPageZeroReader();
+        this.pageZeroSegmentsPages = new BitSet();
+        this.maxNumberOfColumnsInAPage = bufferCapacity
+                / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        this.offsetPointable = new VoidPointable();
+    }
+
+    @Override
+    public void resetStream(IColumnBufferProvider pageZeroSegmentBufferProvider) throws HyracksDataException {
+        segmentBuffers.reset(pageZeroSegmentBufferProvider);
+    }
+
+    @Override
+    public void reset(ByteBuffer pageZeroBuf) {
+        this.pageZeroBuf = pageZeroBuf;
+        zerothSegmentMaxColumns = pageZeroBuf.getInt(MAX_COLUMNS_IN_ZEROTH_SEGMENT);
+        zerothSegmentReader.reset(pageZeroBuf, Math.min(zerothSegmentMaxColumns, getNumberOfPresentColumns()),
+                headerSize);
+        numberOfPageZeroSegments = pageZeroBuf.getInt(NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET);
+    }
+
+    @Override
+    public void reset(ByteBuffer pageZeroBuf, int headerSize) {
+        throw new UnsupportedOperationException("This method should not be called for multi-page readers.");
+    }
+
+    @Override
+    public int getColumnOffset(int columnIndex) throws HyracksDataException {
+        try {
+            if (columnIndex < zerothSegmentMaxColumns) {
+                return zerothSegmentReader.getColumnOffset(columnIndex);
+            } else {
+                int segmentIndex = (columnIndex - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage;
+                int columnIndexInRequiredSegment = (columnIndex - zerothSegmentMaxColumns) % maxNumberOfColumnsInAPage;
+                int segmentOffset = columnIndexInRequiredSegment * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+                segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset,
+                        DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE);
+                return IntegerPointable.getInteger(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+            }
+        } catch (EOFException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public int getNumberOfPageZeroSegments() {
+        return numberOfPageZeroSegments;
+    }
+
+    @Override
+    public BitSet getPageZeroSegmentsPages() {
+        //If pageZeroSegmentsPages is null, it means that the CloudReadContext is not being used.
+        // which indicates all the segments are being read.
+        return pageZeroSegmentsPages;
+    }
+
+    @Override
+    public long getColumnFilterMin(int columnIndex) throws HyracksDataException {
+        try {
+            if (columnIndex < zerothSegmentMaxColumns) {
+                return zerothSegmentReader.getColumnFilterMin(columnIndex);
+            } else {
+                int segmentIndex = (columnIndex - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage;
+                int columnIndexInRequiredSegment = (columnIndex - zerothSegmentMaxColumns) % maxNumberOfColumnsInAPage;
+                int segmentOffset =
+                        findNumberOfColumnsInSegment(segmentIndex) * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE
+                                + columnIndexInRequiredSegment * DefaultColumnPageZeroWriter.FILTER_SIZE;
+                segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset, Long.BYTES);
+                return LongPointable.getLong(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+            }
+        } catch (EOFException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public long getColumnFilterMax(int columnIndex) throws HyracksDataException {
+        try {
+            if (columnIndex < zerothSegmentMaxColumns) {
+                return zerothSegmentReader.getColumnFilterMax(columnIndex);
+            } else {
+                int segmentIndex = (columnIndex - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage;
+                int columnIndexInRequiredSegment = (columnIndex - zerothSegmentMaxColumns) % maxNumberOfColumnsInAPage;
+                int segmentOffset =
+                        findNumberOfColumnsInSegment(segmentIndex) * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE
+                                + columnIndexInRequiredSegment * DefaultColumnPageZeroWriter.FILTER_SIZE;
+                segmentOffset += Long.BYTES; // Move to the max value in the filter
+                segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset, Long.BYTES);
+                return LongPointable.getLong(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+            }
+        } catch (EOFException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private int findNumberOfColumnsInSegment(int segmentIndex) {
+        // starts from 1st segment, not from 0th segment
+        if (segmentIndex == numberOfPageZeroSegments - 2) {
+            return getNumberOfPresentColumns() - zerothSegmentMaxColumns
+                    - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage;
+        }
+        // For segments beyond the zeroth segment, we can have maximum number of columns in a page, except the last segment.
+        return maxNumberOfColumnsInAPage;
+    }
+
+    @Override
+    public void skipFilters() {
+        zerothSegmentReader.skipFilters();
+    }
+
+    @Override
+    public void skipColumnOffsets() {
+        zerothSegmentReader.skipColumnOffsets();
+    }
+
+    @Override
+    public int getTupleCount() {
+        return pageZeroBuf.getInt(TUPLE_COUNT_OFFSET);
+    }
+
+    @Override
+    public int getLeftMostKeyOffset() {
+        return pageZeroBuf.getInt(LEFT_MOST_KEY_OFFSET);
+    }
+
+    @Override
+    public int getRightMostKeyOffset() {
+        return pageZeroBuf.getInt(RIGHT_MOST_KEY_OFFSET);
+    }
+
+    @Override
+    public int getNumberOfPresentColumns() {
+        return pageZeroBuf.getInt(NUMBER_OF_COLUMNS_OFFSET);
+    }
+
+    @Override
+    public int getRelativeColumnIndex(int columnIndex) {
+        return columnIndex;
+    }
+
+    @Override
+    public int getNextLeaf() {
+        return pageZeroBuf.getInt(NEXT_LEAF_OFFSET);
+    }
+
+    @Override
+    public int getMegaLeafNodeLengthInBytes() {
+        return pageZeroBuf.getInt(MEGA_LEAF_NODE_LENGTH);
+    }
+
+    @Override
+    public int getPageZeroCapacity() {
+        return pageZeroBuf.capacity();
+    }
+
+    @Override
+    public boolean isValidColumn(int columnIndex) {
+        return columnIndex < getNumberOfPresentColumns();
+    }
+
+    @Override
+    public void getAllColumns(BitSet presentColumns) {
+        int numberOfColumns = getNumberOfPresentColumns();
+        presentColumns.set(0, numberOfColumns);
+    }
+
+    @Override
+    public ByteBuffer getPageZeroBuf() {
+        return pageZeroBuf;
+    }
+
+    @Override
+    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+        int columnOffsetStart = headerSize;
+        for (int i = 0; i < Math.min(offsetColumnIndexPairs.length, zerothSegmentMaxColumns); i++) {
+            // search in the 0th segment
+            int offset = pageZeroBuf.getInt(columnOffsetStart);
+            offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
+            columnOffsetStart += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        }
+
+        if (offsetColumnIndexPairs.length > zerothSegmentMaxColumns) {
+            // read the rest of the columns from the segment stream
+            segmentBuffers.readOffset(offsetColumnIndexPairs, zerothSegmentMaxColumns, maxNumberOfColumnsInAPage);
+        }
+    }
+
+    @Override
+    public BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll) {
+        pageZeroSegmentsPages.clear();
+        // Not marking the zeroth segment
+        if (numberOfPageZeroSegments == 1 || markAll) {
+            // mark all segments as required
+            pageZeroSegmentsPages.set(1, numberOfPageZeroSegments);
+        } else {
+            // Iterate over the projected columns and mark the segments that contain them
+            int currentIndex = projectedColumns.nextSetBit(zerothSegmentMaxColumns);
+            while (currentIndex >= 0) {
+                int rangeEnd = projectedColumns.nextClearBit(currentIndex); // exclusive
+
+                int fromSegmentIndex = (currentIndex - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage + 1;
+                int toSegmentIndex = (rangeEnd - 1 - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage + 1;
+
+                if (fromSegmentIndex <= toSegmentIndex) {
+                    pageZeroSegmentsPages.set(fromSegmentIndex, toSegmentIndex + 1); // inclusive range
+                }
+
+                currentIndex = projectedColumns.nextSetBit(rangeEnd);
+            }
+        }
+
+        return pageZeroSegmentsPages;
+    }
+
+    @Override
+    public int getHeaderSize() {
+        return EXTENDED_HEADER_SIZE;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
new file mode 100644
index 0000000..7f16d5e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers.multipage;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.FLAG_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.RIGHT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.SIZE_OF_COLUMNS_OFFSETS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.asterix.column.bytes.stream.out.MultiPersistentPageZeroBufferBytesOutputStream;
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IValuesWriter;
+
+/*
+[ PageZero Segment 0 ]
+──────────────────────────────────────────────────────────────────────────────
+| Headers                                                                    |
+| ───────────────────────────────────────────────────────────────────────── |
+| TupleCountOffset                                                           |
+| MaxColumnsInZerothSegment                                                  |
+| LevelOffset                                                                |
+| NumberOfColumnsOffset                                                      |
+| LeftMostKeyOffset                                                          |
+| RightMostKeyOffset                                                         |
+| SizeOfColumnsOffsetsOffset                                                 |
+| MegaLeafNodeLength                                                         |
+| FlagOffset                                                                 |
+| NextLeafOffset                                                             |
+| NumberOfPageSegments                                                       |
+| MaxColumnsInPageZeroSegment                                                |
+
+| Min Primary Key                                                            |
+| Max Primary Key                                                            |
+| Primary Key Values                                                         |
+| [ offset₁, min₁, max₁ ]                                                   |
+| [ offset₂, min₂, max₂ ]                                                   |
+| [ offset₃, min₃, max₃ ]                                                   |
+| ...                                                                        |
+
+[ PageZero Segment 1..N ]
+──────────────────────────────────────────────────────────────────────────────
+| Additional column metadata (same format)                                   |
+*/
+public class DefaultColumnMultiPageZeroWriter implements IColumnPageZeroWriter {
+    // for storing max columns allowed in zeroth segment
+    public static final int NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET = HEADER_SIZE;
+    public static final int MAX_COLUMNS_IN_ZEROTH_SEGMENT = HEADER_SIZE + Integer.BYTES;
+    public static final int EXTENDED_HEADER_SIZE = MAX_COLUMNS_IN_ZEROTH_SEGMENT + Integer.BYTES;
+
+    private final MultiPersistentPageZeroBufferBytesOutputStream segments;
+    private final DefaultColumnPageZeroWriter zerothSegmentWriter;
+    // maximum number of columns that can be laid out in the zeroth segments
+    private final int zerothSegmentMaxColumns;
+    private final int maximumNumberOfColumnsInAPage; // this is the maximum number of columns that can be laid out in a page
+
+    private int numberOfColumns;
+    private int numberOfColumnInZerothSegment;
+    private int numberOfPageZeroSegments; // this includes the zeroth segment
+
+    public DefaultColumnMultiPageZeroWriter(IColumnWriteMultiPageOp multiPageOp, int zerothSegmentMaxColumns,
+            int bufferCapacity) {
+        Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new MutableObject<>();
+        multiPageOpRef.setValue(multiPageOp);
+        segments = new MultiPersistentPageZeroBufferBytesOutputStream(multiPageOpRef); // should this be populated at reset?
+        this.zerothSegmentWriter = new DefaultColumnPageZeroWriter();
+        this.zerothSegmentMaxColumns = zerothSegmentMaxColumns;
+        this.maximumNumberOfColumnsInAPage = bufferCapacity
+                / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+    }
+
+    @Override
+    public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns) throws HyracksDataException {
+        this.numberOfColumns = numberOfColumns;
+        this.numberOfColumnInZerothSegment = Math.min(numberOfColumns, zerothSegmentMaxColumns);
+        this.numberOfPageZeroSegments = calculateNumberOfPageZeroSegments(numberOfColumns,
+                numberOfColumnInZerothSegment, maximumNumberOfColumnsInAPage);
+        zerothSegmentWriter.resetBasedOnColumns(presentColumns, numberOfColumnInZerothSegment, EXTENDED_HEADER_SIZE);
+        if (numberOfPageZeroSegments > 1) {
+            // these many buffers need to be allocated, to get contiguous pageIds
+            segments.reset(numberOfPageZeroSegments - 1);
+        }
+    }
+
+    @Override
+    public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns, int headerSize)
+            throws HyracksDataException {
+        throw new UnsupportedOperationException(
+                "resetBasedOnColumns with headerSize is not supported in multi-page zero writer");
+    }
+
+    private int calculateNumberOfPageZeroSegments(int numberOfColumns, int numberOfColumnInZerothSegment,
+            int maximumNumberOfColumnsInAPage) {
+        // calculate the number of segments required to store the columns
+        int numberOfColumnsBeyondZerothSegment = numberOfColumns - numberOfColumnInZerothSegment;
+        if (numberOfColumnsBeyondZerothSegment <= 0) {
+            return 1; // only zeroth segment is needed
+        }
+        return 1 + (int) Math.ceil((double) numberOfColumnsBeyondZerothSegment / maximumNumberOfColumnsInAPage);
+    }
+
+    @Override
+    public void allocateColumns() {
+        // allocate the zeroth segment columns
+        zerothSegmentWriter.allocateColumns();
+        // rest of the segments need not need to be allocated
+        // as those are full of columns
+    }
+
+    @Override
+    public void putColumnOffset(int columnIndex, int relativeColumnIndex, int offset) throws HyracksDataException {
+        try {
+            // for default writer, both columnIndex and relativeColumnIndex are the same
+            if (columnIndex < zerothSegmentMaxColumns) {
+                zerothSegmentWriter.putColumnOffset(columnIndex, relativeColumnIndex, offset);
+            } else {
+                // For columns beyond the zeroth segment, we need to write to the segments
+                int columnIndexInSegment = columnIndex - numberOfColumnInZerothSegment;
+                int requiredSegment = columnIndexInSegment / maximumNumberOfColumnsInAPage;
+                int columnIndexInRequiredSegment = columnIndexInSegment % maximumNumberOfColumnsInAPage;
+                int offsetInSegment = columnIndexInRequiredSegment * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+                segments.writeInSegment(requiredSegment, offsetInSegment, offset);
+            }
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void putColumnFilter(int columnIndex, long normalizedMinValue, long normalizedMaxValue)
+            throws HyracksDataException {
+        try {
+            if (columnIndex < zerothSegmentMaxColumns) {
+                zerothSegmentWriter.putColumnFilter(columnIndex, normalizedMinValue, normalizedMaxValue);
+            } else {
+                // For columns beyond the zeroth segment, we need to write to the segments
+                int columnIndexInSegment = columnIndex - numberOfColumnInZerothSegment;
+                int requiredSegment = columnIndexInSegment / maximumNumberOfColumnsInAPage;
+                int columnIndexInRequiredSegment = columnIndexInSegment % maximumNumberOfColumnsInAPage;
+                int numberOfColumnsInSegment = findNumberOfColumnsInSegment(requiredSegment);
+                int segmentFilterOffset = numberOfColumnsInSegment * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+                int offsetInSegment =
+                        segmentFilterOffset + columnIndexInRequiredSegment * DefaultColumnPageZeroWriter.FILTER_SIZE;
+                segments.writeInSegment(requiredSegment, offsetInSegment, normalizedMinValue);
+                segments.writeInSegment(requiredSegment, offsetInSegment + Long.BYTES, normalizedMaxValue);
+            }
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private int findNumberOfColumnsInSegment(int segmentIndex) {
+        // starts from 1st segment, not from 0th segment
+        if (segmentIndex == numberOfPageZeroSegments - 2) {
+            return numberOfColumns - numberOfColumnInZerothSegment
+                    - (numberOfPageZeroSegments - 2) * maximumNumberOfColumnsInAPage;
+        }
+        // For segments beyond the zeroth segment, we can have maximum number of columns in a page, except the last segment.
+        return maximumNumberOfColumnsInAPage;
+    }
+
+    @Override
+    public void writePrimaryKeyColumns(IValuesWriter[] primaryKeyWriters) throws HyracksDataException {
+        // primary key columns are always written to the zeroth segment
+        zerothSegmentWriter.writePrimaryKeyColumns(primaryKeyWriters);
+    }
+
+    @Override
+    public byte flagCode() {
+        return MULTI_PAGE_DEFAULT_WRITER_FLAG;
+    }
+
+    @Override
+    public void flush(ByteBuffer buf, int numberOfTuples, ITupleReference minKey, ITupleReference maxKey,
+            AbstractColumnTupleWriter columnWriter, ITreeIndexTupleWriter rowTupleWriter) throws HyracksDataException {
+        buf.position(EXTENDED_HEADER_SIZE);
+        zerothSegmentWriter.setPageZero(buf);
+        buf.putInt(MEGA_LEAF_NODE_LENGTH, columnWriter.flush(this));
+        // Write min and max keys
+        int offset = buf.position();
+        buf.putInt(LEFT_MOST_KEY_OFFSET, offset);
+        offset += rowTupleWriter.writeTuple(minKey, buf.array(), offset);
+        buf.putInt(RIGHT_MOST_KEY_OFFSET, offset);
+        rowTupleWriter.writeTuple(maxKey, buf.array(), offset);
+
+        // Write page information
+        buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
+        buf.put(FLAG_OFFSET, flagCode());
+        buf.putInt(NUMBER_OF_COLUMNS_OFFSET, getNumberOfColumns());
+        buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, getColumnOffsetsSize());
+        // write the number of segments
+        buf.putInt(NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET, numberOfPageZeroSegments);
+        // write the number of columns in the zeroth segment
+        buf.putInt(MAX_COLUMNS_IN_ZEROTH_SEGMENT, zerothSegmentMaxColumns);
+
+        // reset the collected meta info
+        segments.finish();
+        columnWriter.reset();
+    }
+
+    @Override
+    public int getNumberOfColumns() {
+        return numberOfColumns;
+    }
+
+    @Override
+    public boolean includeOrderedColumn(BitSet presentColumns, int columnIndex, boolean includeChildrenColumns) {
+        return true;
+    }
+
+    @Override
+    public int getPageZeroBufferCapacity() {
+        int pageSize = zerothSegmentWriter.getPageZeroBufferCapacity();
+        return pageSize * numberOfPageZeroSegments;
+    }
+
+    @Override
+    public int getRelativeColumnIndex(int columnIndex) {
+        return columnIndex;
+    }
+
+    @Override
+    public int getColumnOffsetsSize() {
+        return numberOfColumns * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+    }
+
+    @Override
+    public void setPageZero(ByteBuffer pageZero) {
+        throw new IllegalStateException("setPageZero is not supported in multi-page zero writer");
+    }
+
+    @Override
+    public int getHeaderSize() {
+        return EXTENDED_HEADER_SIZE;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
new file mode 100644
index 0000000..d555bdc
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers.multipage;
+
+import static org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter.MAX_COLUMNS_IN_ZEROTH_SEGMENT;
+import static org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter.NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET;
+import static org.apache.asterix.column.zero.writers.multipage.SparseColumnMultiPageZeroWriter.MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NEXT_LEAF_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.RIGHT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.asterix.column.zero.readers.SparseColumnPageZeroReader;
+import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
+public class SparseColumnMultiPageZeroReader extends AbstractColumnMultiPageZeroReader {
+    private final SparseColumnPageZeroReader zerothSegmentReader;
+    private final int maxNumberOfColumnsInAPage;
+    private final BitSet pageZeroSegmentsPages;
+    private final Int2IntOpenHashMap columnIndexToRelativeColumnIndex;
+
+    private int maxColumnIndexInZerothSegment;
+    private int numberOfColumnInZerothSegment;
+    private int numberOfPageZeroSegments;
+    private int headerSize;
+    private ByteBuffer pageZeroBuf;
+
+    private final VoidPointable offsetPointable;
+
+    public SparseColumnMultiPageZeroReader(int bufferCapacity) {
+        super();
+        zerothSegmentReader = new SparseColumnPageZeroReader();
+        this.pageZeroSegmentsPages = new BitSet();
+        this.maxNumberOfColumnsInAPage = bufferCapacity
+                / (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
+        this.offsetPointable = new VoidPointable();
+        this.columnIndexToRelativeColumnIndex = new Int2IntOpenHashMap();
+        columnIndexToRelativeColumnIndex.defaultReturnValue(-1);
+    }
+
+    @Override
+    public void resetStream(IColumnBufferProvider pageZeroSegmentBufferProvider) throws HyracksDataException {
+        segmentBuffers.reset(pageZeroSegmentBufferProvider);
+    }
+
+    @Override
+    public void reset(ByteBuffer pageZeroBuf) {
+        this.pageZeroBuf = pageZeroBuf;
+        numberOfPageZeroSegments = pageZeroBuf.getInt(NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET);
+        numberOfColumnInZerothSegment = pageZeroBuf.getInt(MAX_COLUMNS_IN_ZEROTH_SEGMENT);
+        maxColumnIndexInZerothSegment = pageZeroBuf.getInt(MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET);
+        headerSize = MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
+        zerothSegmentReader.reset(pageZeroBuf, Math.min(numberOfColumnInZerothSegment, getNumberOfPresentColumns()),
+                headerSize);
+        columnIndexToRelativeColumnIndex.clear();
+    }
+
+    @Override
+    public void reset(ByteBuffer pageZeroBuf, int headerSize) {
+        throw new UnsupportedOperationException("This method is not supported for multi-page zero readers.");
+    }
+
+    @Override
+    public int getColumnOffset(int columnIndex) throws HyracksDataException {
+        try {
+            if (columnIndex <= maxColumnIndexInZerothSegment) {
+                return zerothSegmentReader.getColumnOffset(columnIndex);
+            } else {
+                int segmentIndex = findSegment(columnIndex) - 1;
+                int relativeColumnIndex = findRelativeColumnIndex(columnIndex);
+                int columnIndexInRequiredSegment =
+                        (relativeColumnIndex - numberOfColumnInZerothSegment) % maxNumberOfColumnsInAPage;
+                int segmentOffset =
+                        columnIndexInRequiredSegment * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + Integer.BYTES; // skipping 4 bytes of columnIndex
+                segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset,
+                        SparseColumnPageZeroWriter.COLUMN_INDEX_SIZE);
+                return IntegerPointable.getInteger(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+            }
+        } catch (EOFException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private int findSegment(int columnIndex) {
+        // This method finds the segment index (except for 0th segment) for the given columnIndex.
+        if (numberOfPageZeroSegments == 1) {
+            // only zeroth segment is present
+            return -1;
+        }
+        // gives 0 based segment index (0 for zeroth segment, 1 for first segment, etc.)
+        if (columnIndex <= maxColumnIndexInZerothSegment) {
+            return 0;
+        } else {
+            int start = 0;
+            int end = numberOfPageZeroSegments - 1;
+            int resultSegment = -1;
+            while (start <= end) {
+                int mid = (start + end) / 2;
+                int segmentColumnIndex =
+                        pageZeroBuf.getInt(MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + mid * Integer.BYTES);
+                if (segmentColumnIndex >= columnIndex) {
+                    resultSegment = mid;
+                    end = mid - 1; // continue searching in the left half
+                } else {
+                    start = mid + 1;
+                }
+            }
+            return resultSegment;
+        }
+    }
+
+    private int findRelativeColumnIndex(int columnIndex) throws HyracksDataException {
+        if (columnIndexToRelativeColumnIndex.get(columnIndex) != -1) {
+            return columnIndexToRelativeColumnIndex.get(columnIndex);
+        }
+        if (columnIndex <= maxColumnIndexInZerothSegment) {
+            return zerothSegmentReader.getRelativeColumnIndex(columnIndex);
+        } else {
+            int segmentIndex = findSegment(columnIndex);
+            if (segmentIndex == -1) {
+                return -1;
+            }
+            segmentIndex -= 1; // Adjusting to get the segment index for the segment stream
+            // Oth based segment index, hence need to check in segmentIndex - 1 th buffer
+            int numberOfColumnsInSegment =
+                    segmentIndex == numberOfPageZeroSegments - 2
+                            ? getNumberOfPresentColumns() - numberOfColumnInZerothSegment
+                                    - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage
+                            : maxNumberOfColumnsInAPage;
+            int segmentColumnIndex =
+                    segmentBuffers.findColumnIndexInSegment(segmentIndex, columnIndex, numberOfColumnsInSegment);
+            if (segmentColumnIndex == -1) {
+                return -1;
+            }
+            int relativeIndex =
+                    numberOfColumnInZerothSegment + segmentIndex * maxNumberOfColumnsInAPage + segmentColumnIndex;
+            columnIndexToRelativeColumnIndex.put(columnIndex, relativeIndex);
+            return relativeIndex;
+        }
+    }
+
+    private int findNumberOfColumnsInSegment(int segmentIndex) {
+        // starts from 1st segment, not from 0th segment
+        if (segmentIndex == numberOfPageZeroSegments - 2) {
+            return getNumberOfPresentColumns() - numberOfColumnInZerothSegment
+                    - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage;
+        }
+        // For segments beyond the zeroth segment, we can have maximum number of columns in a page, except the last segment.
+        return maxNumberOfColumnsInAPage;
+    }
+
+    @Override
+    public long getColumnFilterMin(int columnIndex) throws HyracksDataException {
+        try {
+            if (columnIndex <= maxColumnIndexInZerothSegment) {
+                return zerothSegmentReader.getColumnFilterMin(columnIndex);
+            } else {
+                int segmentIndex = findSegment(columnIndex) - 1;
+                int relativeColumnIndex = findRelativeColumnIndex(columnIndex);
+                int columnIndexInRequiredSegment =
+                        (relativeColumnIndex - numberOfColumnInZerothSegment) % maxNumberOfColumnsInAPage;
+                int segmentOffset =
+                        findNumberOfColumnsInSegment(segmentIndex) * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+                segmentOffset += columnIndexInRequiredSegment * SparseColumnPageZeroWriter.FILTER_SIZE;
+                segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset, Long.BYTES);
+                return LongPointable.getLong(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+            }
+        } catch (EOFException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public long getColumnFilterMax(int columnIndex) throws HyracksDataException {
+        try {
+            if (columnIndex <= maxColumnIndexInZerothSegment) {
+                return zerothSegmentReader.getColumnFilterMax(columnIndex);
+            } else {
+                int segmentIndex = findSegment(columnIndex) - 1;
+                int relativeColumnIndex = findRelativeColumnIndex(columnIndex);
+                int columnIndexInRequiredSegment =
+                        (relativeColumnIndex - numberOfColumnInZerothSegment) % maxNumberOfColumnsInAPage;
+                int segmentOffset =
+                        findNumberOfColumnsInSegment(segmentIndex) * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+                segmentOffset += columnIndexInRequiredSegment * SparseColumnPageZeroWriter.FILTER_SIZE;
+                segmentOffset += Long.BYTES; // skip min filter
+                segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset, Long.BYTES);
+                return LongPointable.getLong(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+            }
+        } catch (EOFException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void skipFilters() {
+        zerothSegmentReader.skipFilters();
+    }
+
+    @Override
+    public void skipColumnOffsets() {
+        zerothSegmentReader.skipColumnOffsets();
+    }
+
+    @Override
+    public int getTupleCount() {
+        return pageZeroBuf.getInt(TUPLE_COUNT_OFFSET);
+    }
+
+    @Override
+    public int getLeftMostKeyOffset() {
+        return pageZeroBuf.getInt(LEFT_MOST_KEY_OFFSET);
+    }
+
+    @Override
+    public int getRightMostKeyOffset() {
+        return pageZeroBuf.getInt(RIGHT_MOST_KEY_OFFSET);
+    }
+
+    @Override
+    public int getNumberOfPresentColumns() {
+        return pageZeroBuf.getInt(NUMBER_OF_COLUMNS_OFFSET);
+    }
+
+    @Override
+    public int getRelativeColumnIndex(int columnIndex) throws HyracksDataException {
+        return findRelativeColumnIndex(columnIndex);
+    }
+
+    @Override
+    public int getNextLeaf() {
+        return pageZeroBuf.getInt(NEXT_LEAF_OFFSET);
+    }
+
+    @Override
+    public int getMegaLeafNodeLengthInBytes() {
+        return pageZeroBuf.getInt(MEGA_LEAF_NODE_LENGTH);
+    }
+
+    @Override
+    public int getPageZeroCapacity() {
+        return pageZeroBuf.capacity();
+    }
+
+    @Override
+    public boolean isValidColumn(int columnIndex) throws HyracksDataException {
+        return findRelativeColumnIndex(columnIndex) != -1;
+    }
+
+    @Override
+    public void getAllColumns(BitSet presentColumns) {
+        int columnOffsetStart = headerSize;
+        for (int i = 0; i < Math.min(getNumberOfPresentColumns(), numberOfColumnInZerothSegment); i++) {
+            int columnIndex = pageZeroBuf.getInt(columnOffsetStart);
+            presentColumns.set(columnIndex);
+            columnOffsetStart += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        }
+        if (getNumberOfPresentColumns() > numberOfColumnInZerothSegment) {
+            // read the rest of the columns from the segment stream
+            int columnsInLastSegment = getNumberOfPresentColumns() - numberOfColumnInZerothSegment
+                    - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage;
+            segmentBuffers.readAllColumns(presentColumns, numberOfPageZeroSegments, maxNumberOfColumnsInAPage,
+                    columnsInLastSegment);
+        }
+    }
+
+    @Override
+    public ByteBuffer getPageZeroBuf() {
+        throw new UnsupportedOperationException("This method is not supported for multi-page zero readers.");
+    }
+
+    @Override
+    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+        // OffsetColumnIndexPairs is of size getNumberOfPresentColumns() + 1
+        int columnOffsetStart = headerSize;
+        for (int i = 0; i < Math.min(offsetColumnIndexPairs.length - 1, numberOfColumnInZerothSegment); i++) {
+            int columnIndex = pageZeroBuf.getInt(columnOffsetStart);
+            int columnOffset = pageZeroBuf.getInt(columnOffsetStart + SparseColumnPageZeroWriter.COLUMN_INDEX_SIZE);
+            offsetColumnIndexPairs[i] = IntPairUtil.of(columnOffset, columnIndex);
+            columnOffsetStart += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+        }
+
+        if (offsetColumnIndexPairs.length - 1 > numberOfColumnInZerothSegment) {
+            // read the rest of the columns from the segment stream
+            int columnsInLastSegment = getNumberOfPresentColumns() - numberOfColumnInZerothSegment
+                    - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage;
+            segmentBuffers.readSparseOffset(offsetColumnIndexPairs, numberOfPageZeroSegments, maxNumberOfColumnsInAPage,
+                    columnsInLastSegment);
+        }
+    }
+
+    @Override
+    public int getNumberOfPageZeroSegments() {
+        return numberOfPageZeroSegments;
+    }
+
+    @Override
+    public BitSet getPageZeroSegmentsPages() {
+        return pageZeroSegmentsPages;
+    }
+
+    @Override
+    public int getHeaderSize() {
+        return headerSize;
+    }
+
+    @Override
+    public BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll) {
+        pageZeroSegmentsPages.clear();
+        // Not marking the zeroth segment
+        if (numberOfPageZeroSegments == 1 || markAll) {
+            // mark all segments as required
+            pageZeroSegmentsPages.set(1, numberOfPageZeroSegments);
+        } else {
+            // Iterate over the projected columns and mark the segments that contain them
+            int currentIndex = projectedColumns.nextSetBit(maxColumnIndexInZerothSegment + 1);
+            while (currentIndex >= 0) {
+                int rangeEnd = projectedColumns.nextClearBit(currentIndex); // exclusive
+                int startSegmentIndex = findSegment(currentIndex);
+                int endSegmentIndex = findSegment(rangeEnd - 1);
+
+                if (startSegmentIndex <= endSegmentIndex) {
+                    pageZeroSegmentsPages.set(startSegmentIndex, endSegmentIndex + 1);
+                }
+
+                currentIndex = projectedColumns.nextSetBit(rangeEnd);
+            }
+        }
+        return pageZeroSegmentsPages;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java
new file mode 100644
index 0000000..695ee6e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers.multipage;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.FLAG_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.RIGHT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.SIZE_OF_COLUMNS_OFFSETS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.asterix.column.bytes.stream.out.MultiPersistentPageZeroBufferBytesOutputStream;
+import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IValuesWriter;
+
+/*
+[ PageZero Segment 0 ]
+──────────────────────────────────────────────────────────────────────────────
+| Headers                                                                    |
+| ───────────────────────────────────────────────────────────────────────── |
+| TupleCountOffset                                                           |
+| MaxColumnsInZerothSegment                                                  |
+| LevelOffset                                                                |
+| NumberOfColumnsOffset                                                      |
+| LeftMostKeyOffset                                                          |
+| RightMostKeyOffset                                                         |
+| SizeOfColumnsOffsetsOffset                                                 |
+| MegaLeafNodeLength                                                         |
+| FlagOffset                                                                 |
+| NextLeafOffset                                                             |
+| NumberOfPageSegments                                                       |
+| MaxColumnIndexInZerothSegment                                              |
+| MaxColumnIndexInFirstSegment                                               |
+| MaxColumnIndexInThirdSegment                                               |
+
+| Min Primary Key                                                            |
+| Max Primary Key                                                            |
+| Primary Key Values                                                         |
+| [ offset₁, min₁, max₁ ]                                                   |
+| [ offset₂, min₂, max₂ ]                                                   |
+| [ offset₃, min₃, max₃ ]                                                   |
+| ...                                                                        |
+
+[ PageZero Segment 1..N ]
+──────────────────────────────────────────────────────────────────────────────
+| Additional column metadata (same format)                                   |
+*/
+public class SparseColumnMultiPageZeroWriter implements IColumnPageZeroWriter {
+    //For storing the last columnIndex in the ith segment
+    public static final int NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET = HEADER_SIZE;
+    public static final int MAX_COLUMNS_IN_ZEROTH_SEGMENT_OFFSET = NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET + 4;
+    public static final int MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET = MAX_COLUMNS_IN_ZEROTH_SEGMENT_OFFSET + 4;
+
+    private final MultiPersistentPageZeroBufferBytesOutputStream segments;
+    private final SparseColumnPageZeroWriter zerothSegmentWriter;
+    private final int maximumNumberOfColumnsInAPage;
+    private final int zerothSegmentMaxColumns;
+    private int[] presentColumns;
+    private int numberOfPresentColumns;
+    private int numberOfPageZeroSegments;
+    private int numberOfColumnInZerothSegment;
+
+    public SparseColumnMultiPageZeroWriter(IColumnWriteMultiPageOp multiPageOp, int zerothSegmentMaxColumns,
+            int bufferCachePageSize) {
+        Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new MutableObject<>();
+        multiPageOpRef.setValue(multiPageOp);
+        segments = new MultiPersistentPageZeroBufferBytesOutputStream(multiPageOpRef);
+        this.zerothSegmentMaxColumns = zerothSegmentMaxColumns;
+        this.zerothSegmentWriter = new SparseColumnPageZeroWriter();
+        this.maximumNumberOfColumnsInAPage = bufferCachePageSize
+                / (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
+    }
+
+    @Override
+    public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns) throws HyracksDataException {
+        this.presentColumns = presentColumns;
+        this.numberOfPresentColumns = presentColumns.length;
+        this.numberOfColumnInZerothSegment = Math.min(numberOfPresentColumns, zerothSegmentMaxColumns);
+        this.numberOfPageZeroSegments = calculateNumberOfPageZeroSegments(numberOfPresentColumns,
+                numberOfColumnInZerothSegment, maximumNumberOfColumnsInAPage);
+        int headerSize = MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
+        zerothSegmentWriter.resetInnerBasedOnColumns(presentColumns, numberOfColumnInZerothSegment, headerSize);
+        if (numberOfPageZeroSegments > 1) {
+            segments.reset(numberOfPageZeroSegments - 1);
+        }
+    }
+
+    @Override
+    public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns, int headerSize)
+            throws HyracksDataException {
+        throw new UnsupportedOperationException(
+                "resetBasedOnColumns with headerSize is not supported in multi-page zero writer");
+    }
+
+    @Override
+    public byte flagCode() {
+        return MULTI_PAGE_SPARSE_WRITER_FLAG;
+    }
+
+    private int calculateNumberOfPageZeroSegments(int numberOfColumns, int numberOfColumnInZerothSegment,
+            int maximumNumberOfColumnsInAPage) {
+        // calculate the number of segments required to store the columns
+        int numberOfColumnsBeyondZerothSegment = numberOfColumns - numberOfColumnInZerothSegment;
+        if (numberOfColumnsBeyondZerothSegment <= 0) {
+            return 1; // only zeroth segment is needed
+        }
+        return 1 + (int) Math.ceil((double) numberOfColumnsBeyondZerothSegment / maximumNumberOfColumnsInAPage);
+    }
+
+    @Override
+    public void allocateColumns() {
+        // allocate the zeroth segment columns
+        zerothSegmentWriter.allocateColumns();
+    }
+
+    @Override
+    public void putColumnOffset(int absoluteColumnIndex, int relativeColumnIndex, int offset)
+            throws HyracksDataException {
+        // for sparse writer, we need to find the relative column index in the present columns.
+        try {
+            if (relativeColumnIndex < zerothSegmentMaxColumns) {
+                // Write to the zeroth segment
+                zerothSegmentWriter.putColumnOffset(absoluteColumnIndex, relativeColumnIndex, offset);
+            } else {
+                int columnIndexInSegment = relativeColumnIndex - numberOfColumnInZerothSegment;
+                int requiredSegment = columnIndexInSegment / maximumNumberOfColumnsInAPage;
+                int columnIndexInRequiredSegment = columnIndexInSegment % maximumNumberOfColumnsInAPage;
+                int offsetInSegment = columnIndexInRequiredSegment * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+                segments.writeInSegment(requiredSegment, offsetInSegment, absoluteColumnIndex, offset);
+            }
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void putColumnFilter(int relativeColumnIndex, long normalizedMinValue, long normalizedMaxValue)
+            throws HyracksDataException {
+        try {
+            if (relativeColumnIndex < zerothSegmentMaxColumns) {
+                zerothSegmentWriter.putColumnFilter(relativeColumnIndex, normalizedMinValue, normalizedMaxValue);
+            } else {
+                // For columns beyond the zeroth segment, we need to write to the segments
+                int columnIndexInSegment = relativeColumnIndex - numberOfColumnInZerothSegment;
+                int requiredSegment = columnIndexInSegment / maximumNumberOfColumnsInAPage;
+                int columnIndexInRequiredSegment = columnIndexInSegment % maximumNumberOfColumnsInAPage;
+                int numberOfColumnsInSegment = findNumberOfColumnsInSegment(requiredSegment);
+                int segmentFilterOffset = numberOfColumnsInSegment * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+                int offsetInSegment =
+                        segmentFilterOffset + columnIndexInRequiredSegment * SparseColumnPageZeroWriter.FILTER_SIZE;
+                segments.writeInSegment(requiredSegment, offsetInSegment, normalizedMinValue);
+                segments.writeInSegment(requiredSegment, offsetInSegment + Long.BYTES, normalizedMaxValue);
+            }
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private int findNumberOfColumnsInSegment(int segmentIndex) {
+        // starts from 1st segment, not from 0th segment
+        if (segmentIndex == numberOfPageZeroSegments - 2) {
+            return numberOfPresentColumns - numberOfColumnInZerothSegment
+                    - (numberOfPageZeroSegments - 2) * maximumNumberOfColumnsInAPage;
+        }
+        // For segments beyond the zeroth segment, we can have maximum number of columns in a page, except the last segment.
+        return maximumNumberOfColumnsInAPage;
+    }
+
+    @Override
+    public void writePrimaryKeyColumns(IValuesWriter[] primaryKeyWriters) throws HyracksDataException {
+        zerothSegmentWriter.writePrimaryKeyColumns(primaryKeyWriters);
+    }
+
+    @Override
+    public int getNumberOfColumns() {
+        return numberOfPresentColumns;
+    }
+
+    @Override
+    public boolean includeOrderedColumn(BitSet presentColumns, int columnIndex, boolean includeChildrenColumns) {
+        return zerothSegmentWriter.includeOrderedColumn(presentColumns, columnIndex, includeChildrenColumns);
+    }
+
+    @Override
+    public int getPageZeroBufferCapacity() {
+        int pageSize = zerothSegmentWriter.getPageZeroBufferCapacity();
+        return pageSize * numberOfPageZeroSegments;
+    }
+
+    @Override
+    public int getRelativeColumnIndex(int columnIndex) {
+        int relativeColumnIndex =
+                zerothSegmentWriter.findColumnIndex(presentColumns, numberOfPresentColumns, columnIndex);
+        if (relativeColumnIndex == -1) {
+            throw new IllegalStateException("Column index " + relativeColumnIndex + " is out of bounds");
+        }
+        return relativeColumnIndex;
+    }
+
+    @Override
+    public int getColumnOffsetsSize() {
+        return numberOfPresentColumns * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+    }
+
+    @Override
+    public void setPageZero(ByteBuffer pageZero) {
+        throw new IllegalStateException("setPageZero is not supported in multi-page zero writer");
+    }
+
+    @Override
+    public void flush(ByteBuffer buf, int numberOfTuples, ITupleReference minKey, ITupleReference maxKey,
+            AbstractColumnTupleWriter columnWriter, ITreeIndexTupleWriter rowTupleWriter) throws HyracksDataException {
+        zerothSegmentWriter.setPageZero(buf);
+        buf.putInt(MEGA_LEAF_NODE_LENGTH, columnWriter.flush(this));
+        // Write min and max keys
+        int offset = buf.position();
+        buf.putInt(LEFT_MOST_KEY_OFFSET, offset);
+        offset += rowTupleWriter.writeTuple(minKey, buf.array(), offset);
+        buf.putInt(RIGHT_MOST_KEY_OFFSET, offset);
+        rowTupleWriter.writeTuple(maxKey, buf.array(), offset);
+
+        // Write page information
+        buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
+        buf.put(FLAG_OFFSET, flagCode());
+        buf.putInt(NUMBER_OF_COLUMNS_OFFSET, getNumberOfColumns());
+        buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, getColumnOffsetsSize());
+        // write the number of segments
+        buf.putInt(NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET, numberOfPageZeroSegments);
+        // write the max column count in the zeroth segment
+        buf.putInt(MAX_COLUMNS_IN_ZEROTH_SEGMENT_OFFSET, numberOfColumnInZerothSegment);
+
+        // write the max columnIndex in headers.
+        for (int i = 0; i < numberOfPageZeroSegments; i++) {
+            int columnIndexOffset = MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + i * Integer.BYTES;
+            if (i == 0) {
+                int presentColumnIndex = numberOfColumnInZerothSegment - 1;
+                buf.putInt(columnIndexOffset, presentColumns[presentColumnIndex]);
+            } else if (i == numberOfPageZeroSegments - 1) {
+                buf.putInt(columnIndexOffset, presentColumns[numberOfPresentColumns - 1]);
+            } else {
+                int presentColumnIndex = numberOfColumnInZerothSegment + i * maximumNumberOfColumnsInAPage;
+                buf.putInt(columnIndexOffset, presentColumns[presentColumnIndex - 1]);
+            }
+        }
+
+        // reset the collected meta info
+        segments.finish();
+        columnWriter.reset();
+    }
+
+    @Override
+    public int getHeaderSize() {
+        return MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
+    }
+
+    public static int getHeaderSpace(int numberOfPageZeroSegments) {
+        return MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
+    }
+}
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
index 5c929c1..aa3cb71 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
@@ -35,6 +35,11 @@
     }
 
     @Override
+    public ByteBuffer confiscatePageZeroPersistent() throws HyracksDataException {
+        return null;
+    }
+
+    @Override
     public ByteBuffer confiscateTemporary() throws HyracksDataException {
         return null;
     }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java
index 8e01740..556a87c 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java
@@ -20,6 +20,7 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
 
 public class TestWriteMultiPageOp implements IColumnWriteMultiPageOp {
@@ -37,6 +38,11 @@
     }
 
     @Override
+    public ByteBuffer confiscatePageZeroPersistent() throws HyracksDataException {
+        return dummyBufferCache.allocate(fileId).getBuffer();
+    }
+
+    @Override
     public ByteBuffer confiscateTemporary() {
         return dummyBufferCache.allocateTemporary();
     }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index fe5ef54..faa2a87 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -19,8 +19,6 @@
 package org.apache.asterix.column.test.bytes;
 
 import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
-import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
-import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
 import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
 
 import java.io.File;
@@ -190,14 +188,8 @@
     protected void writeFullPage(ByteBuffer pageZero, AbstractColumnTupleWriter writer, int tupleCount)
             throws HyracksDataException {
         pageZero.clear();
-        //Reserve the header space
-        pageZero.position(HEADER_SIZE);
-        pageZero.putInt(MEGA_LEAF_NODE_LENGTH, writer.flush(pageZero, new DefaultColumnPageZeroWriter()));
-        //Write page header
-        int numberOfColumn = writer.getAbsoluteNumberOfColumns(false);
-        pageZero.putInt(TUPLE_COUNT_OFFSET, tupleCount);
-        pageZero.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumn);
-
+        DefaultColumnPageZeroWriter pageZeroWriter = new DefaultColumnPageZeroWriter();
+        pageZeroWriter.flush(pageZero, tupleCount, writer);
     }
 
     protected boolean isFull(AbstractColumnTupleWriter columnWriter, int tupleCount, ITupleReference tuple) {
@@ -210,7 +202,7 @@
         //Reserved for the number of pages
         int requiredFreeSpace = HEADER_SIZE;
         //Columns' Offsets
-        requiredFreeSpace += columnWriter.getColumnOccupiedSpace(true);
+        requiredFreeSpace += columnWriter.getPageZeroWriterOccupiedSpace(100, true, false);
         //Occupied space from previous writes
         requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize();
         //New tuple required space
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
index ea4dbe6a..0038fbe 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.column.values.writer;
 
-import java.nio.ByteBuffer;
 import java.util.PriorityQueue;
 
 import org.apache.asterix.column.values.IColumnBatchWriter;
@@ -33,8 +32,8 @@
     }
 
     @Override
-    public void setPageZeroWriter(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter,
-            int[] presentColumnsIndexes, int numberOfColumns) {
+    public void setPageZeroWriter(IColumnPageZeroWriter pageZeroWriter, int[] presentColumnsIndexes,
+            int numberOfColumns) {
         // NoOp
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
index 973d9a1..86cac57 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
@@ -88,6 +88,10 @@
         return cloudIOManager.punchHole(handle.getFileHandle(), offset, length);
     }
 
+    public BufferCache getBufferCache() {
+        return bufferCache;
+    }
+
     /**
      * Whether the sweep operation should stop or proceed
      * Stopping condition:
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index b52f59f..705df6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -64,7 +64,8 @@
      *
      * @return the size needed to store columns' offsets
      */
-    public abstract int getColumnOccupiedSpace(boolean includeCurrentTupleColumns);
+    public abstract int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment,
+            boolean includeCurrentTupleColumns, boolean adaptive);
 
     /**
      * @return maximum number of tuples to be stored per page (i.e., page0)
@@ -88,8 +89,7 @@
      *
      * @return total flushed length (including page zero)
      */
-    public abstract int flush(ByteBuffer pageZero, IColumnPageZeroWriter columnPageZeroWriter)
-            throws HyracksDataException;
+    public abstract int flush(IColumnPageZeroWriter columnPageZeroWriter) throws HyracksDataException;
 
     /**
      * Close the current writer and release all allocated temporary buffers
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java
index 2309fe1..b46b67d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java
@@ -38,6 +38,8 @@
      */
     ByteBuffer confiscatePersistent() throws HyracksDataException;
 
+    ByteBuffer confiscatePageZeroPersistent() throws HyracksDataException;
+
     /**
      * Persist all confiscated persistent buffers to disk
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index 98fa419..de2d1e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -25,10 +25,13 @@
 
 import java.util.BitSet;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import it.unimi.dsi.fastutil.ints.IntArrays;
 import it.unimi.dsi.fastutil.longs.LongArrays;
@@ -38,6 +41,7 @@
  * Computes columns offsets, lengths, and pages
  */
 public final class ColumnRanges {
+    private static final Logger LOGGER = LogManager.getLogger();
     private static final LongComparator OFFSET_COMPARATOR = IntPairUtil.FIRST_COMPARATOR;
     private final int numberOfPrimaryKeys;
 
@@ -79,7 +83,7 @@
      *
      * @param leafFrame to compute the ranges for
      */
-    public void reset(ColumnBTreeReadLeafFrame leafFrame) {
+    public void reset(ColumnBTreeReadLeafFrame leafFrame) throws HyracksDataException {
         reset(leafFrame, EMPTY, EMPTY, EMPTY);
     }
 
@@ -89,7 +93,7 @@
      * @param leafFrame to compute the ranges for
      * @param plan      eviction plan
      */
-    public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet plan) {
+    public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet plan) throws HyracksDataException {
         reset(leafFrame, plan, EMPTY, EMPTY);
     }
 
@@ -102,7 +106,7 @@
      * @param cloudOnlyColumns locked columns that cannot be read from a local disk
      */
     public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet requestedColumns, BitSet evictableColumns,
-            BitSet cloudOnlyColumns) {
+            BitSet cloudOnlyColumns) throws HyracksDataException {
         // Set leafFrame
         this.leafFrame = leafFrame;
         // Ensure arrays capacities (given the leafFrame's columns and pages)
@@ -122,6 +126,12 @@
 
         int columnOrdinal = 0;
         for (int i = 0; i < numberOfColumns; i++) {
+            if (offsetColumnIndexPairs[i] == 0) { // any column's offset can't be zero
+                LOGGER.warn(
+                        "Unexpected zero column offset at index {}. This may indicate a logic error or data inconsistency.",
+                        i);
+                continue;
+            }
             int columnIndex = getColumnIndexFromPair(offsetColumnIndexPairs[i]);
             int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
             int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
@@ -165,7 +175,7 @@
      * @param columnIndex column index
      * @return pageID
      */
-    public int getColumnStartPageIndex(int columnIndex) {
+    public int getColumnStartPageIndex(int columnIndex) throws HyracksDataException {
         int pageSize = leafFrame.getBuffer().capacity();
         return getColumnPageIndex(leafFrame.getColumnOffset(columnIndex), pageSize);
     }
@@ -176,7 +186,7 @@
      * @param columnIndex column index
      * @return number of pages
      */
-    public int getColumnNumberOfPages(int columnIndex) {
+    public int getColumnNumberOfPages(int columnIndex) throws HyracksDataException {
         int pageSize = leafFrame.getBuffer().capacity();
         int offset = getColumnStartOffset(leafFrame.getColumnOffset(columnIndex), pageSize);
         int firstBufferLength = pageSize - offset;
@@ -297,8 +307,18 @@
         for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
             builder.append(String.format("%03d", i));
             builder.append(":");
-            int startPageId = getColumnStartPageIndex(i);
-            int columnPagesCount = getColumnNumberOfPages(i);
+            int startPageId = 0;
+            try {
+                startPageId = getColumnStartPageIndex(i);
+            } catch (HyracksDataException e) {
+                throw new RuntimeException(e);
+            }
+            int columnPagesCount = 0;
+            try {
+                columnPagesCount = getColumnNumberOfPages(i);
+            } catch (HyracksDataException e) {
+                throw new RuntimeException(e);
+            }
             printColumnPages(builder, numberOfPages, startPageId, columnPagesCount);
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnReadContext.java
index 86a650b..e37b9b1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnReadContext.java
@@ -43,6 +43,9 @@
     ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
             throws HyracksDataException;
 
+    void preparePageZeroSegments(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+            throws HyracksDataException;
+
     /**
      * Prepare the columns' pages
      * Notes:
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 8d64eec..2cefab3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -131,6 +131,23 @@
     }
 
     @Override
+    public void preparePageZeroSegments(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+            throws HyracksDataException {
+        if (leafFrame.getNumberOfPageZeroSegments() <= 1) { // don't need to include the zeroth segment
+            return;
+        }
+
+        // pin the required page segments
+        mergedPageRanges.clear();
+        int pageZeroId = leafFrame.getPageId();
+        BitSet pageZeroSegmentRanges =
+                leafFrame.markRequiredPageZeroSegments(projectedColumns, pageZeroId, operation == MERGE);
+        // Merge the page zero segments ranges
+        mergePageZeroSegmentRanges(pageZeroSegmentRanges);
+        mergedPageRanges.pin(columnCtx, bufferCache, fileId, pageZeroId);
+    }
+
+    @Override
     public void prepareColumns(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
             throws HyracksDataException {
         if (leafFrame.getTupleCount() == 0) {
@@ -139,9 +156,12 @@
 
         columnRanges.reset(leafFrame, projectedColumns, plan, cloudOnlyColumns);
         int pageZeroId = leafFrame.getPageId();
+        int numberOfPageZeroSegments = leafFrame.getNumberOfPageZeroSegments();
 
         if (operation == MERGE) {
-            pinAll(fileId, pageZeroId, leafFrame.getMegaLeafNodeNumberOfPages() - 1, bufferCache);
+            // will contain column pages along with page zero segments
+            pinAll(fileId, pageZeroId + numberOfPageZeroSegments - 1,
+                    leafFrame.getMegaLeafNodeNumberOfPages() - numberOfPageZeroSegments, bufferCache);
         } else {
             pinProjected(fileId, pageZeroId, bufferCache);
         }
@@ -198,6 +218,36 @@
         mergedPageRanges.pin(columnCtx, bufferCache, fileId, pageZeroId);
     }
 
+    private void mergePageZeroSegmentRanges(BitSet pageZeroSegmentRanges) {
+        // Since the 0th segment is already pinned, we can skip it
+        pageZeroSegmentRanges.clear(0);
+        if (pageZeroSegmentRanges.cardinality() == 0) {
+            // No page zero segments, nothing to merge
+            return;
+        }
+
+        int start = -1;
+        int prev = -1;
+
+        int current = pageZeroSegmentRanges.nextSetBit(0);
+        while (current >= 0) {
+            if (start == -1) {
+                // Start of a new range
+                start = current;
+            } else if (current != prev + 1) {
+                // Discontinuous: close the current range
+                mergedPageRanges.addRange(start, prev);
+                start = current;
+            }
+
+            prev = current;
+            current = pageZeroSegmentRanges.nextSetBit(current + 1);
+        }
+
+        // Close the final range
+        mergedPageRanges.addRange(start, prev);
+    }
+
     @Override
     public void release(IBufferCache bufferCache) throws HyracksDataException {
         // Release might differ in the future if prefetching is supported
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java
index 11275a4..5917f65 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java
@@ -76,7 +76,14 @@
     }
 
     @Override
+    public void preparePageZeroSegments(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+            throws HyracksDataException {
+        // NoOp
+    }
+
+    @Override
     public void prepareColumns(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId) {
+        // If there are page segments, they are expected to be present in flash cache.
         // NoOp
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
index d12f649..5b837a1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
@@ -20,6 +20,7 @@
 
 import static org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
 
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -58,6 +59,7 @@
     private final IntSet indexedColumns;
     private final ISweepClock clock;
     private final int evictionPlanReevaluationThreshold;
+    private final List<ICachedPage> pageZeroSegmentsTempHolder;
     private int numberOfColumns;
     private long lastAccess;
     private int maxSize;
@@ -78,6 +80,7 @@
         plan = new BitSet();
         reevaluatedPlan = new BitSet();
         punchableThreshold = INITIAL_PUNCHABLE_THRESHOLD;
+        pageZeroSegmentsTempHolder = new ArrayList<>();
         this.evictionPlanReevaluationThreshold = evictionPlanReevaluationThreshold;
     }
 
@@ -215,12 +218,19 @@
         ICachedPage page = bufferCache.pin(dpid);
         try {
             leafFrame.setPage(page);
+            pageZeroSegmentsTempHolder.clear();
+            leafFrame.pinPageZeroSegments(columnBTree.getFileId(), columnBTree.getBulkloadLeafStart(),
+                    pageZeroSegmentsTempHolder, bufferCache, null);
             ranges.reset(leafFrame);
             for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
                 sizes[i] = Math.max(sizes[i], ranges.getColumnLength(i));
                 maxSize = Math.max(maxSize, sizes[i]);
             }
         } finally {
+            // unpin the segment pages
+            for (ICachedPage segmentPage : pageZeroSegmentsTempHolder) {
+                bufferCache.unpin(segmentPage);
+            }
             bufferCache.unpin(page);
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
index 9fc3b8d..1f0c7f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -38,6 +38,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.util.StorageUtil;
@@ -50,11 +51,13 @@
     private final ColumnSweepLockInfo lockedColumns;
     private final ColumnRanges ranges;
     private final List<ILSMDiskComponent> sweepableComponents;
+    private final List<ICachedPage> segmentPagesTempHolder;
 
     public ColumnSweeper(int numberOfPrimaryKeys) {
         lockedColumns = new ColumnSweepLockInfo();
         ranges = new ColumnRanges(numberOfPrimaryKeys);
         sweepableComponents = new ArrayList<>();
+        segmentPagesTempHolder = new ArrayList<>();
     }
 
     public long sweep(BitSet plan, SweepContext context, IColumnTupleProjector sweepProjector)
@@ -175,6 +178,8 @@
                     columnsLocked = page0.trySweepLock(lockedColumns);
                     if (columnsLocked) {
                         leafFrame.setPage(page0);
+                        leafFrame.pinPageZeroSegments(fileId, leafFrame.getPageId(), segmentPagesTempHolder,
+                                context.getBufferCache(), SweepBufferCacheReadContext.INSTANCE);
                         ranges.reset(leafFrame, plan);
                         freedSpace += punchHoles(context, leafFrame);
                     }
@@ -182,6 +187,10 @@
                     if (columnsLocked) {
                         page0.sweepUnlock();
                     }
+                    // unpin segment pages
+                    for (ICachedPage cachedPage : segmentPagesTempHolder) {
+                        context.unpin(cachedPage, bcOpCtx);
+                    }
                     context.unpin(page0, bcOpCtx);
                 }
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
index f3a7c4f..f03195b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
@@ -100,12 +100,11 @@
         // Duplicate to avoid interference when scanning the dataset twice
         this.buf = page.getBuffer().duplicate();
         buf.clear();
-        buf.position(HEADER_SIZE);
         resetPageZeroReader();
     }
 
     protected void resetPageZeroReader() {
-
+        buf.position(HEADER_SIZE);
     };
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 8d1080a..88fe5bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -49,11 +49,14 @@
 public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements IColumnWriteMultiPageOp {
     private static final Logger LOGGER = LogManager.getLogger();
     private final List<CachedPage> columnsPages;
+    private final List<ICachedPage> pageZeroSegments; // contains from 1st segment to the last segment of page0
     private final List<CachedPage> tempConfiscatedPages;
     private final ColumnBTreeWriteLeafFrame columnarFrame;
     private final AbstractColumnTupleWriter columnWriter;
     private final ISplitKey lowKey;
     private final IColumnWriteContext columnWriteContext;
+    private final int maxColumnsInPageZerothSegment;
+    private final boolean adaptiveWriter;
     private boolean setLowKey;
     private int tupleCount;
 
@@ -61,6 +64,7 @@
     private int numberOfLeafNodes;
     private int numberOfPagesInCurrentLeafNode;
     private int maxNumberOfPagesForAColumn;
+    private int maxNumberOfPageZeroSegments; // Exclude the zeroth segment
     private int maxNumberOfPagesInALeafNode;
     private int maxTupleCount;
     private int lastRequiredFreeSpace;
@@ -69,6 +73,7 @@
             ITreeIndexFrame leafFrame, IBufferCacheWriteContext writeContext) throws HyracksDataException {
         super(fillFactor, verifyInput, callback, index, leafFrame, writeContext);
         columnsPages = new ArrayList<>();
+        pageZeroSegments = new ArrayList<>();
         tempConfiscatedPages = new ArrayList<>();
         columnWriteContext = (IColumnWriteContext) writeContext;
         columnarFrame = (ColumnBTreeWriteLeafFrame) leafFrame;
@@ -78,10 +83,15 @@
         lowKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
         setLowKey = true;
 
+        // Writer config
+        maxColumnsInPageZerothSegment = 40; // setting a lower value for testing, should be coming from config.
+        adaptiveWriter = false; // should be coming from config.
+
         // For logging. Starts with 1 for page0
         numberOfPagesInCurrentLeafNode = 1;
         maxNumberOfPagesForAColumn = 0;
         maxNumberOfPagesInALeafNode = 0;
+        maxNumberOfPageZeroSegments = 0;
         numberOfLeafNodes = 1;
         maxTupleCount = 0;
         lastRequiredFreeSpace = 0;
@@ -116,10 +126,10 @@
             //We reached the maximum number of tuples
             return true;
         }
-        columnWriter.updateColumnMetadataForCurrentTuple(tuple);
-        int requiredFreeSpace = AbstractColumnBTreeLeafFrame.HEADER_SIZE;
         //Columns' Offsets
-        requiredFreeSpace += columnWriter.getColumnOccupiedSpace(true);
+        columnWriter.updateColumnMetadataForCurrentTuple(tuple);
+        int requiredFreeSpace =
+                columnWriter.getPageZeroWriterOccupiedSpace(maxColumnsInPageZerothSegment, true, adaptiveWriter);
         //Occupied space from previous writes
         requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize();
         //min and max tuples' sizes
@@ -144,7 +154,8 @@
         if (tupleCount > 0) {
             splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
             try {
-                columnarFrame.flush(columnWriter, tupleCount, lowKey.getTuple(), splitKey.getTuple());
+                columnarFrame.flush(columnWriter, tupleCount, maxColumnsInPageZerothSegment, lowKey.getTuple(),
+                        splitKey.getTuple(), this);
             } catch (Exception e) {
                 logState(e);
                 throw e;
@@ -173,7 +184,8 @@
         if (tupleCount > 0) {
             //We need to flush columns to confiscate all columns pages first before calling propagateBulk
             try {
-                columnarFrame.flush(columnWriter, tupleCount, lowKey.getTuple(), splitKey.getTuple());
+                columnarFrame.flush(columnWriter, tupleCount, maxColumnsInPageZerothSegment, lowKey.getTuple(),
+                        splitKey.getTuple(), this);
             } catch (Exception e) {
                 logState(e);
                 throw e;
@@ -190,7 +202,7 @@
          * Write columns' pages first to ensure they (columns' pages) are written before pageZero.
          * It ensures pageZero does not land in between columns' pages if compression is enabled
          */
-        writeColumnsPages();
+        writeColumnAndSegmentPages();
         //Then write page0
         write(leafFrontier.page);
 
@@ -219,10 +231,34 @@
          * Write columns' pages first to ensure they (columns' pages) are written before pageZero.
          * It ensures pageZero does not land in between columns' pages if compression is enabled
          */
-        writeColumnsPages();
+        writeColumnAndSegmentPages();
         super.writeLastLeaf(page);
     }
 
+    private void writeColumnAndSegmentPages() throws HyracksDataException {
+        for (ICachedPage c : columnsPages) {
+            write(c);
+        }
+
+        // For logging
+        int numberOfPagesInPersistedColumn = columnsPages.size();
+        maxNumberOfPagesForAColumn = Math.max(maxNumberOfPagesForAColumn, numberOfPagesInPersistedColumn);
+        numberOfPagesInCurrentLeafNode += numberOfPagesInPersistedColumn;
+        columnsPages.clear();
+
+        // persist page zero segments from 1 to the last segment
+        for (ICachedPage page : pageZeroSegments) {
+            write(page);
+        }
+
+        int numberOfPageZeroSegments = pageZeroSegments.size();
+        maxNumberOfPageZeroSegments = Math.max(maxNumberOfPageZeroSegments, numberOfPageZeroSegments);
+        pageZeroSegments.clear();
+
+        // Indicate to the columnWriteContext that all columns were persisted
+        columnWriteContext.columnsPersisted();
+    }
+
     private void writeColumnsPages() throws HyracksDataException {
         for (ICachedPage c : columnsPages) {
             write(c);
@@ -244,6 +280,10 @@
             bufferCache.returnPage(page, false);
         }
 
+        for (ICachedPage page : pageZeroSegments) {
+            bufferCache.returnPage(page, false);
+        }
+
         for (ICachedPage page : tempConfiscatedPages) {
             bufferCache.returnPage(page, false);
         }
@@ -291,6 +331,15 @@
     }
 
     @Override
+    public ByteBuffer confiscatePageZeroPersistent() throws HyracksDataException {
+        int pageId = freePageManager.takePage(metaFrame);
+        long dpid = BufferedFileHandle.getDiskPageId(fileId, pageId);
+        CachedPage page = (CachedPage) bufferCache.confiscatePage(dpid);
+        pageZeroSegments.add(page);
+        return page.getBuffer();
+    }
+
+    @Override
     public void persist() throws HyracksDataException {
         writeColumnsPages();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index 2669d4b..6bcbd0f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -92,8 +92,9 @@
         do {
             page0 = context.pinNext(frame, bufferCache, fileId);
             stats.getPageCounter().update(1);
-            context.prepareColumns(frame, bufferCache, fileId);
+            context.preparePageZeroSegments(frame, bufferCache, fileId);
             frameTuple.newPage();
+            context.prepareColumns(frame, bufferCache, fileId);
             setCursorPosition();
             nextLeafPage = frame.getNextLeaf();
         } while (frame.getTupleCount() == 0 && nextLeafPage > 0);
@@ -131,8 +132,9 @@
         frame.setPage(page0);
         frame.setMultiComparator(originalKeyCmp);
         if (frame.getTupleCount() > 0) {
-            context.prepareColumns(frame, bufferCache, fileId);
+            context.preparePageZeroSegments(frame, bufferCache, fileId);
             frameTuple.newPage();
+            context.prepareColumns(frame, bufferCache, fileId);
             initCursorPosition(searchPred);
         } else {
             yieldFirstCall = false;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
index 58a62ba..4a78400 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
@@ -18,6 +18,10 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.column.impls.btree;
 
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
@@ -25,10 +29,11 @@
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
 import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-
 public final class ColumnBTreeReadLeafFrame extends AbstractColumnBTreeLeafFrame {
     private final AbstractColumnTupleReader columnarTupleReader;
     private final ITreeIndexTupleReference leftMostTuple;
@@ -45,8 +50,23 @@
 
     @Override
     protected void resetPageZeroReader() {
-        columnPageZeroReader = pageZeroWriterFlavorSelector.createPageZeroReader(getFlagByte());
+        columnPageZeroReader = pageZeroWriterFlavorSelector.createPageZeroReader(getFlagByte(), buf.capacity());
         columnPageZeroReader.reset(buf);
+        buf.position(columnPageZeroReader.getHeaderSize());
+    }
+
+    public void pinPageZeroSegments(int fileId, int pageZeroId, List<ICachedPage> segmentPages,
+            IBufferCache bufferCache, IBufferCacheReadContext bcOpCtx) throws HyracksDataException {
+        // pins all the segments, used by the column planner and sweeper
+        int numberOfPageSegments = getNumberOfPageZeroSegments();
+        for (int i = 1; i < numberOfPageSegments; i++) {
+            long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
+            if (bcOpCtx != null) {
+                segmentPages.add(bufferCache.pin(dpid, bcOpCtx));
+            } else {
+                segmentPages.add(bufferCache.pin(dpid));
+            }
+        }
     }
 
     @Override
@@ -71,7 +91,7 @@
         return rightMostTuple;
     }
 
-    public void getAllColumns(IntOpenHashSet presentColumns) {
+    public void getAllColumns(BitSet presentColumns) {
         columnPageZeroReader.getAllColumns(presentColumns);
     }
 
@@ -88,11 +108,15 @@
         return BufferedFileHandle.getPageId(((CachedPage) page).getDiskPageId());
     }
 
+    public int getNumberOfPageZeroSegments() {
+        return columnPageZeroReader.getNumberOfPageZeroSegments();
+    }
+
     public int getNumberOfColumns() {
         return columnPageZeroReader.getNumberOfPresentColumns();
     }
 
-    public int getColumnOffset(int columnIndex) {
+    public int getColumnOffset(int columnIndex) throws HyracksDataException {
         // update the exception message.
         if (!columnPageZeroReader.isValidColumn(columnIndex)) {
             throw new IndexOutOfBoundsException(columnIndex + " >= " + getNumberOfColumns());
@@ -100,7 +124,7 @@
         return columnPageZeroReader.getColumnOffset(columnIndex);
     }
 
-    public boolean isValidColumn(int columnIndex) {
+    public boolean isValidColumn(int columnIndex) throws HyracksDataException {
         return columnPageZeroReader.isValidColumn(columnIndex);
     }
 
@@ -112,6 +136,14 @@
         return columnPageZeroReader.getMegaLeafNodeLengthInBytes();
     }
 
+    public int getHeaderSize() {
+        return columnPageZeroReader.getHeaderSize();
+    }
+
+    public int getPageZeroSegmentCount() {
+        return columnPageZeroReader.getNumberOfPageZeroSegments();
+    }
+
     // flag needs to be directly accessed from the buffer, as this will be used to choose the pageReader
     public byte getFlagByte() {
         return buf.get(FLAG_OFFSET);
@@ -148,4 +180,12 @@
     public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
         columnPageZeroReader.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
     }
+
+    public BitSet getPageZeroSegmentsPages() {
+        return columnPageZeroReader.getPageZeroSegmentsPages();
+    }
+
+    public BitSet markRequiredPageZeroSegments(BitSet projectedColumns, int pageZeroId, boolean markAll) {
+        return columnPageZeroReader.markRequiredPageSegments(projectedColumns, pageZeroId, markAll);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
index f5b7893..acee2d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -47,34 +48,12 @@
         buf.putInt(NEXT_LEAF_OFFSET, -1);
     }
 
-    void flush(AbstractColumnTupleWriter columnWriter, int numberOfTuples, ITupleReference minKey,
-            ITupleReference maxKey) throws HyracksDataException {
-        IColumnPageZeroWriter pageZeroWriter = pageZeroWriterFlavorSelector.getPageZeroWriter();
-
-        // TODO(zero): Ideally, all these fields should be specific to the writer.
-        // However, some of the position constants are accessed directly elsewhere,
-        // so refactoring will require careful consideration to avoid breaking existing usage.
-
-        // Prepare the space for writing the columns' information such as the primary keys
-        buf.position(HEADER_SIZE);
-        // Flush the columns to persistence pages and write the length of the mega leaf node in pageZero
-        buf.putInt(MEGA_LEAF_NODE_LENGTH, columnWriter.flush(buf, pageZeroWriter));
-        // Write min and max keys
-        int offset = buf.position();
-        buf.putInt(LEFT_MOST_KEY_OFFSET, offset);
-        offset += rowTupleWriter.writeTuple(minKey, buf.array(), offset);
-        buf.putInt(RIGHT_MOST_KEY_OFFSET, offset);
-        rowTupleWriter.writeTuple(maxKey, buf.array(), offset);
-
-        // Write page information
-        buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
-        buf.put(FLAG_OFFSET, pageZeroWriter.flagCode());
-        buf.putInt(NUMBER_OF_COLUMNS_OFFSET, pageZeroWriter.getNumberOfColumns());
-        // correct the offset's, this all should be deferred to writer
-        buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, pageZeroWriter.getColumnOffsetsSize());
-
-        // reset the collected meta info
-        columnWriter.reset();
+    void flush(AbstractColumnTupleWriter columnWriter, int numberOfTuples, int zerothSegmentMaxColumns,
+            ITupleReference minKey, ITupleReference maxKey, IColumnWriteMultiPageOp multiPageOpRef)
+            throws HyracksDataException {
+        IColumnPageZeroWriter pageZeroWriter = pageZeroWriterFlavorSelector.getPageZeroWriter(multiPageOpRef,
+                zerothSegmentMaxColumns, getBuffer().capacity());
+        pageZeroWriter.flush(buf, numberOfTuples, minKey, maxKey, columnWriter, rowTupleWriter);
     }
 
     public AbstractColumnTupleWriter getColumnTupleWriter() {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
index 5a625fc..db4f778 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
@@ -19,18 +19,24 @@
 package org.apache.hyracks.storage.am.lsm.btree.column.impls.btree;
 
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 
 public interface IColumnPageZeroReader {
 
-    void reset(ByteBuffer pageZeroBuf);
+    default void reset(ByteBuffer pageZeroBuf) {
+        reset(pageZeroBuf, AbstractColumnBTreeLeafFrame.HEADER_SIZE);
+    }
 
-    int getColumnOffset(int columnIndex);
+    void reset(ByteBuffer pageZeroBuf, int headerSize);
 
-    int getColumnFilterOffset(int columnIndex);
+    int getColumnOffset(int columnIndex) throws HyracksDataException;
 
-    long getLong(int offset);
+    long getColumnFilterMin(int columnIndex) throws HyracksDataException;
+
+    long getColumnFilterMax(int columnIndex) throws HyracksDataException;
 
     void skipFilters();
 
@@ -44,7 +50,7 @@
 
     int getNumberOfPresentColumns();
 
-    int getRelativeColumnIndex(int columnIndex);
+    int getRelativeColumnIndex(int columnIndex) throws HyracksDataException;
 
     int getNextLeaf();
 
@@ -52,11 +58,21 @@
 
     int getPageZeroCapacity();
 
-    boolean isValidColumn(int columnIndex);
+    boolean isValidColumn(int columnIndex) throws HyracksDataException;
 
-    void getAllColumns(IntOpenHashSet presentColumns);
+    void getAllColumns(BitSet presentColumns);
 
     ByteBuffer getPageZeroBuf();
 
     void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs);
+
+    int getNumberOfPageZeroSegments();
+
+    BitSet getPageZeroSegmentsPages();
+
+    int getHeaderSize();
+
+    void resetStream(IColumnBufferProvider pageZeroSegmentBufferProvider) throws HyracksDataException;
+
+    BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriter.java
index 041415a..3795c47 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriter.java
@@ -22,6 +22,9 @@
 import java.util.BitSet;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 
 /**
  * Interface for writing column metadata to page zero of a column page.
@@ -39,14 +42,23 @@
     /** Flag code for sparse page zero writer */
     byte SPARSE_WRITER_FLAG = 1;
 
+    byte MULTI_PAGE_DEFAULT_WRITER_FLAG = 2;
+
+    byte MULTI_PAGE_SPARSE_WRITER_FLAG = 3;
+
+    int MIN_COLUMN_SPACE = 4 + 16; // offset + filter size
+
     /**
      * Initializes the writer with page zero buffer and column information.
-     * 
-     * @param pageZeroBuf The page zero buffer to write to
+     *
      * @param presentColumns Array of column indexes that are present in this page
      * @param numberOfColumns Total number of columns in the schema (may be larger than presentColumns)
      */
-    void reset(ByteBuffer pageZeroBuf, int[] presentColumns, int numberOfColumns);
+    void resetBasedOnColumns(int[] presentColumns, int numberOfColumns, int headerSize) throws HyracksDataException;
+
+    default void resetBasedOnColumns(int[] presentColumns, int numberOfColumns) throws HyracksDataException {
+        resetBasedOnColumns(presentColumns, numberOfColumns, AbstractColumnBTreeLeafFrame.HEADER_SIZE);
+    }
 
     /**
      * Returns the flag code that identifies this writer type.
@@ -69,7 +81,7 @@
      * @param relativeColumnIndex The relative column index within this page (for sparse layouts)
      * @param offset The byte offset where the column's data begins
      */
-    void putColumnOffset(int absoluteColumnIndex, int relativeColumnIndex, int offset);
+    void putColumnOffset(int absoluteColumnIndex, int relativeColumnIndex, int offset) throws HyracksDataException;
 
     /**
      * Stores filter information (min/max values) for a column.
@@ -79,7 +91,8 @@
      * @param normalizedMinValue The normalized minimum value in the column
      * @param normalizedMaxValue The normalized maximum value in the column
      */
-    void putColumnFilter(int relativeColumnIndex, long normalizedMinValue, long normalizedMaxValue);
+    void putColumnFilter(int relativeColumnIndex, long normalizedMinValue, long normalizedMaxValue)
+            throws HyracksDataException;
 
     /**
      * Writes primary key column data to page zero.
@@ -115,7 +128,7 @@
      * 
      * @return the page zero buffer
      */
-    ByteBuffer getPageZeroBuffer();
+    int getPageZeroBufferCapacity();
 
     /**
      * Maps an absolute column index to a relative index within this page.
@@ -133,4 +146,11 @@
      * @return size in bytes of column offset storage
      */
     int getColumnOffsetsSize();
+
+    void setPageZero(ByteBuffer pageZero);
+
+    void flush(ByteBuffer buf, int numberOfTuples, ITupleReference minKey, ITupleReference maxKey,
+            AbstractColumnTupleWriter columnWriter, ITreeIndexTupleWriter rowTupleWriter) throws HyracksDataException;
+
+    int getHeaderSize();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriterFlavorSelector.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriterFlavorSelector.java
index fe2e24e..fa8e3f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriterFlavorSelector.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriterFlavorSelector.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.column.impls.btree;
 
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
 /**
  * Strategy interface for selecting the optimal page zero writer implementation.
  * 
@@ -40,24 +42,27 @@
      * @param spaceOccupiedByDefaultWriter Space in bytes required by the default writer
      * @param spaceOccupiedBySparseWriter Space in bytes required by the sparse writer
      */
-    void switchPageZeroWriterIfNeeded(int spaceOccupiedByDefaultWriter, int spaceOccupiedBySparseWriter);
+    void switchPageZeroWriterIfNeeded(int spaceOccupiedByDefaultWriter, int spaceOccupiedBySparseWriter,
+            boolean adaptive);
 
     /**
      * Creates the appropriate page zero reader for the given writer type.
-     * 
+     * <p>
      * This method is used during deserialization to create a reader that matches
      * the writer type used during serialization. The flag identifies which
      * layout was used.
-     * 
-     * @param flag The flag code identifying the writer type (0=default, 1=sparse)
+     *
+     * @param flag     The flag code identifying the writer type (0=default, 1=sparse)
+     * @param capacity
      * @return the appropriate reader instance
      */
-    IColumnPageZeroReader createPageZeroReader(byte flag);
+    IColumnPageZeroReader createPageZeroReader(byte flag, int capacity);
 
     /**
      * Returns the currently selected page zero writer instance.
      * 
      * @return the writer instance selected by the most recent call to switchPageZeroWriterIfNeeded
      */
-    IColumnPageZeroWriter getPageZeroWriter();
+    IColumnPageZeroWriter getPageZeroWriter(IColumnWriteMultiPageOp multiPageOpRef, int zerothSegmentMaxColumns,
+            int maximumColumnPerPageSegment);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index a1c60a6..de59836 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples;
 
-import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
-
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -29,6 +27,7 @@
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroReader;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -40,6 +39,7 @@
     private static final String UNSUPPORTED_OPERATION_MSG = "Operation is not supported for column tuples";
     private final int componentIndex;
     protected final ColumnBTreeReadLeafFrame frame;
+    protected final IColumnBufferProvider pageZeroSegmentBufferProvider;
     private final IColumnBufferProvider[] primaryKeyBufferProviders;
     private final IColumnBufferProvider[] filterBufferProviders;
     private final IColumnBufferProvider[] buffersProviders;
@@ -74,6 +74,7 @@
         pinnedPages = new LongOpenHashSet();
         int numberOfFilteredColumns = info.getNumberOfFilteredColumns();
         filterBufferProviders = new IColumnBufferProvider[numberOfFilteredColumns];
+        pageZeroSegmentBufferProvider = new ColumnMultiPageZeroBufferProvider(multiPageOp, pinnedPages);
         for (int i = 0; i < numberOfFilteredColumns; i++) {
             int columnIndex = info.getFilteredColumnIndex(i);
             if (columnIndex < 0) {
@@ -103,8 +104,9 @@
     public void newPage() throws HyracksDataException {
         tupleIndex = 0;
         ByteBuffer pageZero = frame.getBuffer();
+        // should not be needed, as it just been reset in step above
         pageZero.clear();
-        pageZero.position(HEADER_SIZE);
+        pageZero.position(frame.getHeaderSize());
 
         int numberOfTuples = frame.getTupleCount();
 
@@ -114,6 +116,13 @@
             provider.reset(frame);
             startPrimaryKey(provider, i, numberOfTuples);
         }
+
+        // if the pageZero segments > 1, reset the page zero segment buffer provider
+        if (frame.getPageZeroSegmentCount() > 1) {
+            IColumnPageZeroReader pageZeroReader = frame.getColumnPageZeroReader();
+            pageZeroSegmentBufferProvider.reset(frame);
+            pageZeroReader.resetStream(pageZeroSegmentBufferProvider);
+        }
     }
 
     @Override
@@ -249,6 +258,9 @@
             buffersProviders[i].releaseAll();
         }
 
+        // release pageZero segment buffer provider
+        pageZeroSegmentBufferProvider.releaseAll();
+
         maxNumberOfPinnedPages = Math.max(maxNumberOfPinnedPages, pinnedPages.size());
         pinnedPages.clear();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
index 988d413..8737db3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
@@ -35,13 +35,13 @@
 
 import it.unimi.dsi.fastutil.longs.LongSet;
 
-public final class ColumnMultiBufferProvider implements IColumnBufferProvider {
+public class ColumnMultiBufferProvider implements IColumnBufferProvider {
     private final int columnIndex;
     private final IColumnReadMultiPageOp multiPageOp;
     private final Queue<ICachedPage> pages;
     private final LongSet pinnedPages;
-    private int numberOfRemainingPages;
-    private int startPage;
+    protected int numberOfRemainingPages;
+    protected int startPage;
     private int startOffset;
     private int length;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
new file mode 100644
index 0000000..1556bea
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.longs.LongSet;
+
+public class ColumnMultiPageZeroBufferProvider implements IColumnBufferProvider {
+    private static final BitSet EMPTY_SEGMENTS = new BitSet();
+    private final IColumnReadMultiPageOp multiPageOp;
+    private final LongSet pinnedPages;
+    private final List<ICachedPage> pages; // stores from segment 1 to segment n (0 is not stored here)
+
+    private int startPage;
+    private int numberOfRemainingPages;
+    private BitSet pageZeroSegmentsPages;
+
+    public ColumnMultiPageZeroBufferProvider(IColumnReadMultiPageOp multiPageOp, LongSet pinnedPages) {
+        this.multiPageOp = multiPageOp;
+        this.pinnedPages = pinnedPages;
+        this.pages = new ArrayList<>();
+    }
+
+    @Override
+    public void reset(ColumnBTreeReadLeafFrame frame) throws HyracksDataException {
+        startPage = frame.getPageId() + 1;
+        numberOfRemainingPages = frame.getNumberOfPageZeroSegments() - 1; // zeroth segment is not counted
+        pageZeroSegmentsPages = frame.getPageZeroSegmentsPages();
+        if (pageZeroSegmentsPages == null) {
+            pageZeroSegmentsPages = EMPTY_SEGMENTS;
+        }
+    }
+
+    @Override
+    public void readAll(Queue<ByteBuffer> buffers) throws HyracksDataException {
+        throw new IllegalStateException("Reading all pages is not allowed for zero buffer provider.");
+    }
+
+    public int getNumberOfRemainingPages() {
+        return numberOfRemainingPages;
+    }
+
+    public void readAll(List<ByteBuffer> buffers, Int2IntMap segmentDir) throws HyracksDataException {
+        if (pageZeroSegmentsPages == EMPTY_SEGMENTS) {
+            // All the segments are expected to present in the flash cache, but still need pinning into buffer cache.
+            // will do on request basis? or prefetch all the segments?
+            return;
+        }
+        // traverse the set segments and read the pages
+        int currentIndex = pageZeroSegmentsPages.nextSetBit(0);
+        while (currentIndex != -1) {
+            int segmentIndex = currentIndex - 1; // segmentIndex starts from 1
+            ByteBuffer buffer = read(segmentIndex);
+            segmentDir.put(segmentIndex, buffers.size());
+            buffers.add(buffer);
+            currentIndex = pageZeroSegmentsPages.nextSetBit(currentIndex + 1);
+        }
+    }
+
+    public ByteBuffer read(int segmentIndex) throws HyracksDataException {
+        if (segmentIndex < 0 || segmentIndex >= numberOfRemainingPages) {
+            throw new IndexOutOfBoundsException("Segment index out of bounds: " + segmentIndex);
+        }
+        ICachedPage segment = readSegment(segmentIndex);
+        return segment.getBuffer();
+    }
+
+    @Override
+    public void releaseAll() throws HyracksDataException {
+        for (ICachedPage page : pages) {
+            multiPageOp.unpin(page);
+        }
+        pages.clear();
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        throw new UnsupportedOperationException("getBuffer() is not supported for multi-page zero buffer provider.");
+    }
+
+    @Override
+    public int getLength() {
+        throw new IllegalStateException("Reading all pages is not allowed for zero buffer provider.");
+    }
+
+    private ICachedPage readSegment(int segmentIndex) throws HyracksDataException {
+        // The page segments are most likely to be present in the buffer cache,
+        // as the pages are pinned when a new pageZero is accessed.
+        ICachedPage segmentPage = multiPageOp.pin(startPage + segmentIndex);
+        pages.add(segmentPage);
+        pinnedPages.add(((CachedPage) segmentPage).getDiskPageId());
+        return segmentPage;
+    }
+
+    @Override
+    public int getColumnIndex() {
+        throw new IllegalStateException("Reading all pages is not allowed for zero buffer provider.");
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnSingleBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnSingleBufferProvider.java
index 3ae5c7d..cb953fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnSingleBufferProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnSingleBufferProvider.java
@@ -37,7 +37,7 @@
     }
 
     @Override
-    public void reset(ColumnBTreeReadLeafFrame frame) {
+    public void reset(ColumnBTreeReadLeafFrame frame) throws HyracksDataException {
         int offset = frame.getColumnOffset(columnIndex);
         this.buffer = frame.getBuffer().duplicate();
         buffer.position(offset);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index a77ccdf..8169fb9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -500,7 +500,8 @@
             for (ICachedPageInternal internalPage : cachedPages) {
                 CachedPage c = (CachedPage) internalPage;
                 if (c != null) {
-                    if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) {
+                    if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0
+                            || c.pinCount.get() != 0) {
                         return false;
                     }
                     if (c.valid) {