[ASTERIXDB-3601][STO] Fixed calculation issues

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
1. isFull calculation
2. sparse column columnToIndexCache reset
3. bounding the offsetColumnIndexPairs per leaf

Ext-ref: MB-66306
Change-Id: I796b74355eca845f006abb5b45789a5136ba8c84
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20013
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Tested-by: Ritik Raj <ritik.raj@couchbase.com>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
index bb02166..4a59073 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
@@ -82,7 +82,8 @@
         pointable.set(buffer.array(), position, length);
     }
 
-    public void readOffset(long[] offsetColumnIndexPairs, int maxColumnsInZerothSegment, int numberOfColumnsInAPage) {
+    public int readOffset(long[] offsetColumnIndexPairs, int maxColumnsInZerothSegment, int numberOfColumnsInAPage,
+            int currentColumnIndex) {
         int numberOfColumns = offsetColumnIndexPairs.length - 1;
         for (Int2IntMap.Entry pair : segmentDir.int2IntEntrySet()) {
             int segmentIndex = pair.getIntKey();
@@ -92,18 +93,20 @@
             int segmentOffset = 0;
             for (int j = 0; j < numberOfColumnsInAPage; j++) {
                 int columnOffset = buffer.getInt(segmentOffset);
-                offsetColumnIndexPairs[columnIndex] = IntPairUtil.of(columnOffset, columnIndex);
+                offsetColumnIndexPairs[currentColumnIndex] = IntPairUtil.of(columnOffset, columnIndex);
                 segmentOffset += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+                currentColumnIndex++;
                 columnIndex++;
                 if (columnIndex == numberOfColumns) {
                     break; // No need to read more columns from this buffer.
                 }
             }
         }
+        return currentColumnIndex;
     }
 
-    public void readSparseOffset(long[] offsetColumnIndexPairs, int numberOfPageSegments, int numberOfColumnsInAPage,
-            int numberOfColumnsInLastSegment) {
+    public int readSparseOffset(long[] offsetColumnIndexPairs, int numberOfPageSegments, int numberOfColumnsInAPage,
+            int numberOfColumnsInLastSegment, int currentColumnIndex) {
         for (Int2IntMap.Entry pair : segmentDir.int2IntEntrySet()) {
             int segmentIndex = pair.getIntKey();
             int bufferIndex = pair.getIntValue();
@@ -114,10 +117,11 @@
             for (int j = 0; j < numberOfColumnsInSegment; j++) {
                 int columnIndex = buffer.getInt(segmentOffset);
                 int columnOffset = buffer.getInt(segmentOffset + Integer.BYTES);
-                offsetColumnIndexPairs[columnIndex] = IntPairUtil.of(columnOffset, columnIndex);
+                offsetColumnIndexPairs[currentColumnIndex++] = IntPairUtil.of(columnOffset, columnIndex);
                 segmentOffset += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
             }
         }
+        return currentColumnIndex;
     }
 
     public void readAllColumns(BitSet presentColumns, int numberOfPageSegments, int numberOfColumnsInAPage,
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index a1fcfe3..bf76a6b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -145,8 +145,8 @@
     }
 
     @Override
-    public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, boolean includeCurrentTupleColumns,
-            IColumnPageZeroWriter.ColumnPageZeroWriterType writerType) {
+    public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, int bufferCapacity,
+            boolean includeCurrentTupleColumns, IColumnPageZeroWriter.ColumnPageZeroWriterType writerType) {
         int spaceOccupiedByDefaultWriter;
         int spaceOccupiedBySparseWriter;
 
@@ -157,13 +157,13 @@
             return spaceOccupiedByDefaultWriter;
         } else if (writerType == IColumnPageZeroWriter.ColumnPageZeroWriterType.SPARSE) {
             // Maximum space occupied by the columns = maxColumnsInPageZerothSegment * (offset + filter size)
-            spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
+            spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment, bufferCapacity);
             return spaceOccupiedBySparseWriter;
         }
 
         spaceOccupiedByDefaultWriter =
                 getSpaceOccupiedByDefaultWriter(maxColumnsInPageZerothSegment, includeCurrentTupleColumns);
-        spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
+        spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment, bufferCapacity);
         pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
                 spaceOccupiedBySparseWriter);
 
@@ -179,11 +179,14 @@
         return spaceOccupiedByDefaultWriter;
     }
 
-    private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment) {
+    private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment, int bufferCapacity) {
         int presentColumns = transformerForCurrentTuple.getNumberOfVisitedColumnsInBatch();
-        int numberOfPagesRequired = (int) Math.ceil(
-                (double) (presentColumns - maxColumnsInPageZerothSegment) / IColumnPageZeroWriter.MIN_COLUMN_SPACE);
-        int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfPagesRequired);
+        int maximumNumberOfColumnsInASegment =
+                SparseColumnMultiPageZeroWriter.getMaximumNumberOfColumnsInAPage(bufferCapacity);
+        int numberOfExtraPagesRequired = presentColumns <= maxColumnsInPageZerothSegment ? 0
+                : (int) Math.ceil(
+                        (double) (presentColumns - maxColumnsInPageZerothSegment) / maximumNumberOfColumnsInASegment);
+        int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfExtraPagesRequired);
         presentColumns = Math.min(presentColumns, maxColumnsInPageZerothSegment);
 
         // space occupied by the sparse writer
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index d31e1d3..fb5cfdb 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -155,8 +155,8 @@
     }
 
     @Override
-    public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, boolean includeCurrentTupleColumns,
-            IColumnPageZeroWriter.ColumnPageZeroWriterType writerType) {
+    public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, int bufferCapacity,
+            boolean includeCurrentTupleColumns, IColumnPageZeroWriter.ColumnPageZeroWriterType writerType) {
         int spaceOccupiedByDefaultWriter;
         int spaceOccupiedBySparseWriter;
 
@@ -167,11 +167,11 @@
             return spaceOccupiedByDefaultWriter;
         } else if (writerType == IColumnPageZeroWriter.ColumnPageZeroWriterType.SPARSE) {
             // Maximum space occupied by the columns = maxColumnsInPageZerothSegment * (offset + filter size)
-            spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
+            spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment, bufferCapacity);
             return spaceOccupiedBySparseWriter;
         }
 
-        spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
+        spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment, bufferCapacity);
         spaceOccupiedByDefaultWriter =
                 getSpaceOccupiedByDefaultWriter(maxColumnsInPageZerothSegment, includeCurrentTupleColumns);
         pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
@@ -188,11 +188,14 @@
         return spaceOccupiedByDefaultWriter;
     }
 
-    private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment) {
+    private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment, int bufferCapacity) {
         int presentColumns = presentColumnsIndexes.cardinality();
-        int numberOfPagesRequired = (int) Math.ceil(
-                (double) (presentColumns - maxColumnsInPageZerothSegment) / IColumnPageZeroWriter.MIN_COLUMN_SPACE);
-        int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfPagesRequired);
+        int maximumNumberOfColumnsInASegment =
+                SparseColumnMultiPageZeroWriter.getMaximumNumberOfColumnsInAPage(bufferCapacity);
+        int numberOfExtraPagesRequired = presentColumns <= maxColumnsInPageZerothSegment ? 0
+                : (int) Math.ceil(
+                        (double) (presentColumns - maxColumnsInPageZerothSegment) / maximumNumberOfColumnsInASegment);
+        int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfExtraPagesRequired);
         presentColumns = Math.min(presentColumns, maxColumnsInPageZerothSegment);
 
         // space occupied by the sparse writer
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
index a34d8c1..3f729b8 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
@@ -160,13 +160,14 @@
     }
 
     @Override
-    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+    public int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
         int columnOffsetStart = headerSize;
-        for (int i = 0; i < offsetColumnIndexPairs.length; i++) {
+        for (int i = 0; i < numberOfPresentColumns; i++) {
             int offset = pageZeroBuf.getInt(columnOffsetStart);
             offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
             columnOffsetStart += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         }
+        return numberOfPresentColumns;
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
index 3b4fdc4..5955d5e 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
@@ -42,6 +42,12 @@
     }
 
     @Override
+    public void reset(ByteBuffer pageZeroBuf, int numberOfPresentColumns, int headerSize) {
+        super.reset(pageZeroBuf, numberOfPresentColumns, headerSize);
+        columnIndexToRelativeColumnIndex.clear();
+    }
+
+    @Override
     public int getColumnOffset(int columnIndex) {
         int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
         return pageZeroBuf.getInt(
@@ -137,7 +143,7 @@
     }
 
     @Override
-    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+    public int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
         int columnIndex = getColumnIndex(0);
         for (int i = 0; i < numberOfPresentColumns; i++) {
             int column = pageZeroBuf.getInt(columnIndex);
@@ -145,5 +151,6 @@
             offsetColumnIndexPairs[i] = IntPairUtil.of(offset, column);
             columnIndex += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         }
+        return numberOfPresentColumns;
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
index 5a0b180..d29daa7 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
@@ -230,19 +230,24 @@
     }
 
     @Override
-    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+    public int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
         int columnOffsetStart = headerSize;
-        for (int i = 0; i < Math.min(offsetColumnIndexPairs.length, zerothSegmentMaxColumns); i++) {
+        int numberOfColumns = getNumberOfPresentColumns();
+        int currentColumnIndex = 0;
+        while (currentColumnIndex < Math.min(numberOfColumns, zerothSegmentMaxColumns)) {
             // search in the 0th segment
             int offset = pageZeroBuf.getInt(columnOffsetStart);
-            offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
+            offsetColumnIndexPairs[currentColumnIndex] = IntPairUtil.of(offset, currentColumnIndex);
             columnOffsetStart += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+            currentColumnIndex++;
         }
 
-        if (offsetColumnIndexPairs.length > zerothSegmentMaxColumns) {
+        if (numberOfColumns > zerothSegmentMaxColumns) {
             // read the rest of the columns from the segment stream
-            segmentBuffers.readOffset(offsetColumnIndexPairs, zerothSegmentMaxColumns, maxNumberOfColumnsInAPage);
+            currentColumnIndex = segmentBuffers.readOffset(offsetColumnIndexPairs, zerothSegmentMaxColumns,
+                    maxNumberOfColumnsInAPage, currentColumnIndex);
         }
+        return currentColumnIndex;
     }
 
     @Override
@@ -251,11 +256,12 @@
         // Not marking the zeroth segment
         if (numberOfPageZeroSegments == 1 || markAll) {
             // mark all segments as required
-            pageZeroSegmentsPages.set(1, numberOfPageZeroSegments);
+            pageZeroSegmentsPages.set(0, numberOfPageZeroSegments);
         } else {
             // Iterate over the projected columns and mark the segments that contain them
             int currentIndex = projectedColumns.nextSetBit(zerothSegmentMaxColumns);
-            while (currentIndex >= 0) {
+            int totalNumberOfColumns = getNumberOfPresentColumns();
+            while (currentIndex >= 0 && currentIndex < totalNumberOfColumns) {
                 int rangeEnd = projectedColumns.nextClearBit(currentIndex); // exclusive
 
                 int fromSegmentIndex = (currentIndex - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage + 1;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
index 0a0a817..608ff71 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
@@ -118,28 +118,24 @@
         // This method finds the segment index (except for 0th segment) for the given columnIndex.
         if (numberOfPageZeroSegments == 1) {
             // only zeroth segment is present
-            return -1;
+            return 0;
         }
         // gives 0 based segment index (0 for zeroth segment, 1 for first segment, etc.)
-        if (columnIndex <= maxColumnIndexInZerothSegment) {
-            return 0;
-        } else {
-            int start = 0;
-            int end = numberOfPageZeroSegments - 1;
-            int resultSegment = -1;
-            while (start <= end) {
-                int mid = (start + end) / 2;
-                int segmentColumnIndex =
-                        pageZeroBuf.getInt(MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + mid * Integer.BYTES);
-                if (segmentColumnIndex >= columnIndex) {
-                    resultSegment = mid;
-                    end = mid - 1; // continue searching in the left half
-                } else {
-                    start = mid + 1;
-                }
+        int start = 1;
+        int end = numberOfPageZeroSegments - 1;
+        int resultSegment = -1;
+        while (start <= end) {
+            int mid = (start + end) / 2;
+            int segmentColumnIndex =
+                    pageZeroBuf.getInt(MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + mid * Integer.BYTES);
+            if (segmentColumnIndex >= columnIndex) {
+                resultSegment = mid;
+                end = mid - 1; // continue searching in the left half
+            } else {
+                start = mid + 1;
             }
-            return resultSegment;
         }
+        return resultSegment;
     }
 
     private int findRelativeColumnIndex(int columnIndex) throws HyracksDataException {
@@ -150,7 +146,7 @@
             return zerothSegmentReader.getRelativeColumnIndex(columnIndex);
         } else {
             int segmentIndex = findSegment(columnIndex);
-            if (segmentIndex == -1) {
+            if (segmentIndex <= 0) {
                 return -1;
             }
             segmentIndex -= 1; // Adjusting to get the segment index for the segment stream
@@ -303,23 +299,30 @@
     }
 
     @Override
-    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
-        // OffsetColumnIndexPairs is of size getNumberOfPresentColumns() + 1
+    public int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+        // offsetColumnIndexPairs >= getNumberOfPresentColumns() + 1 (maybe because of the previous MegaLeaf).
+        // Do not rely on offsetColumnIndexPairs.length, as it may be larger than the number of present columns.
+        // This is because the same array is reused for multiple leaf segments, and previous leaves may have more columns.
         int columnOffsetStart = headerSize;
-        for (int i = 0; i < Math.min(offsetColumnIndexPairs.length - 1, numberOfColumnInZerothSegment); i++) {
+        int currentColumnIndex = 0;
+        int numberOfColumns = getNumberOfPresentColumns();
+        while (currentColumnIndex < Math.min(numberOfColumns, numberOfColumnInZerothSegment)) {
             int columnIndex = pageZeroBuf.getInt(columnOffsetStart);
             int columnOffset = pageZeroBuf.getInt(columnOffsetStart + SparseColumnPageZeroWriter.COLUMN_INDEX_SIZE);
-            offsetColumnIndexPairs[i] = IntPairUtil.of(columnOffset, columnIndex);
+            offsetColumnIndexPairs[currentColumnIndex++] = IntPairUtil.of(columnOffset, columnIndex);
             columnOffsetStart += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         }
 
-        if (offsetColumnIndexPairs.length - 1 > numberOfColumnInZerothSegment) {
+        // If the pages are not pinned, we will not read any columnIndex, but the old stuffs will already be present in the offsetColumnIndexPairs.
+        if (numberOfColumns > numberOfColumnInZerothSegment) {
             // read the rest of the columns from the segment stream
             int columnsInLastSegment = getNumberOfPresentColumns() - numberOfColumnInZerothSegment
                     - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage;
-            segmentBuffers.readSparseOffset(offsetColumnIndexPairs, numberOfPageZeroSegments, maxNumberOfColumnsInAPage,
-                    columnsInLastSegment);
+            currentColumnIndex = segmentBuffers.readSparseOffset(offsetColumnIndexPairs, numberOfPageZeroSegments,
+                    maxNumberOfColumnsInAPage, columnsInLastSegment, currentColumnIndex);
         }
+
+        return currentColumnIndex;
     }
 
     @Override
@@ -343,14 +346,26 @@
         // Not marking the zeroth segment
         if (numberOfPageZeroSegments == 1 || markAll) {
             // mark all segments as required
-            pageZeroSegmentsPages.set(1, numberOfPageZeroSegments);
+            pageZeroSegmentsPages.set(0, numberOfPageZeroSegments);
         } else {
             // Iterate over the projected columns and mark the segments that contain them
             int currentIndex = projectedColumns.nextSetBit(maxColumnIndexInZerothSegment + 1);
             while (currentIndex >= 0) {
                 int rangeEnd = projectedColumns.nextClearBit(currentIndex); // exclusive
                 int startSegmentIndex = findSegment(currentIndex);
+                if (startSegmentIndex == -1) {
+                    //This indicates that the currentIndex > MaxColumnIndex in the last segment
+                    //Hence this leaf doesn't need to pin the segment for requested column ranges.
+
+                    //We can return early as next projectedColumns next set bit will also be out of bounds.
+                    break;
+                }
                 int endSegmentIndex = findSegment(rangeEnd - 1);
+                if (endSegmentIndex == -1) {
+                    //This indicates that the rangeEnd - 1 > MaxColumnIndex in the last segment
+                    //but the startSegmentIndex is valid, hence we may pin to the last segment.
+                    endSegmentIndex = numberOfPageZeroSegments - 1; // Last segment index
+                }
 
                 if (startSegmentIndex <= endSegmentIndex) {
                     pageZeroSegmentsPages.set(startSegmentIndex, endSegmentIndex + 1);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java
index 695ee6e..5753632 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java
@@ -96,8 +96,7 @@
         segments = new MultiPersistentPageZeroBufferBytesOutputStream(multiPageOpRef);
         this.zerothSegmentMaxColumns = zerothSegmentMaxColumns;
         this.zerothSegmentWriter = new SparseColumnPageZeroWriter();
-        this.maximumNumberOfColumnsInAPage = bufferCachePageSize
-                / (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
+        this.maximumNumberOfColumnsInAPage = getMaximumNumberOfColumnsInAPage(bufferCachePageSize);
     }
 
     @Override
@@ -282,7 +281,12 @@
         return MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
     }
 
-    public static int getHeaderSpace(int numberOfPageZeroSegments) {
-        return MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
+    public static int getHeaderSpace(int numberOfExtraPagesRequired) {
+        return MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfExtraPagesRequired * Integer.BYTES;
+    }
+
+    public static int getMaximumNumberOfColumnsInAPage(int bufferCachePageSize) {
+        return bufferCachePageSize
+                / (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
     }
 }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 6e85d29..1ef865e 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -202,8 +202,10 @@
         }
         //Reserved for the number of pages
         int requiredFreeSpace = HEADER_SIZE;
+        //Since this test uses DefaultWriter, it does not need the bufferCapacity in the calculation
+        int bufferCapacity = Integer.MAX_VALUE;
         //Columns' Offsets
-        requiredFreeSpace += columnWriter.getPageZeroWriterOccupiedSpace(100, true,
+        requiredFreeSpace += columnWriter.getPageZeroWriterOccupiedSpace(100, bufferCapacity, true,
                 IColumnPageZeroWriter.ColumnPageZeroWriterType.DEFAULT);
         //Occupied space from previous writes
         requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index f5cef05..8b3abd6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -64,7 +64,7 @@
      *
      * @return the size needed to store columns' offsets
      */
-    public abstract int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment,
+    public abstract int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, int bufferCapacity,
             boolean includeCurrentTupleColumns, IColumnPageZeroWriter.ColumnPageZeroWriterType adaptive);
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index 6cf43bf..ba3cda4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -112,20 +112,19 @@
         // Ensure arrays capacities (given the leafFrame's columns and pages)
         init();
 
-        // Get the number of columns in a page
-        int numberOfColumns = leafFrame.getNumberOfColumns();
         // Set the first 32-bits to the offset and the second 32-bits to columnIndex
-        leafFrame.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
+        int numberOfPresentColumnsInLeaf = leafFrame.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
 
         // Set artificial offset to determine the last column's length
         int megaLeafLength = leafFrame.getMegaLeafNodeLengthInBytes();
-        offsetColumnIndexPairs[numberOfColumns] = IntPairUtil.of(megaLeafLength, numberOfColumns);
+        offsetColumnIndexPairs[numberOfPresentColumnsInLeaf] =
+                IntPairUtil.of(megaLeafLength, numberOfPresentColumnsInLeaf);
 
         // Sort the pairs by offset (i.e., lowest offset first)
-        LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfColumns, OFFSET_COMPARATOR);
+        LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfPresentColumnsInLeaf, OFFSET_COMPARATOR);
 
         int columnOrdinal = 0;
-        for (int i = 0; i < numberOfColumns; i++) {
+        for (int i = 0; i < numberOfPresentColumnsInLeaf; i++) {
             if (offsetColumnIndexPairs[i] == 0) {
                 //Any requested column's offset can't be zero
                 //In case a column is not being present in the accessed pageZero segments, it will be defaulted to 0
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 2cefab3..5458807 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -140,8 +140,14 @@
         // pin the required page segments
         mergedPageRanges.clear();
         int pageZeroId = leafFrame.getPageId();
-        BitSet pageZeroSegmentRanges =
-                leafFrame.markRequiredPageZeroSegments(projectedColumns, pageZeroId, operation == MERGE);
+        // Pinning all the segments of the page zero for now,
+        // as the column eviction logic is based on the length of the columns which
+        // gets evaluated from the page zero segments.
+
+        //TODO: find a way to pin only the segments that are required for the operation
+        // or pin all the segments and then unpin the segments that are not required
+        boolean markAll = true || operation == MERGE;
+        BitSet pageZeroSegmentRanges = leafFrame.markRequiredPageZeroSegments(projectedColumns, pageZeroId, markAll);
         // Merge the page zero segments ranges
         mergePageZeroSegmentRanges(pageZeroSegmentRanges);
         mergedPageRanges.pin(columnCtx, bufferCache, fileId, pageZeroId);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 8439373..9ca57dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -133,8 +133,8 @@
         }
         //Columns' Offsets
         columnWriter.updateColumnMetadataForCurrentTuple(tuple);
-        int requiredFreeSpace =
-                columnWriter.getPageZeroWriterOccupiedSpace(maxColumnsInPageZerothSegment, true, pageZeroWriterType);
+        int requiredFreeSpace = columnWriter.getPageZeroWriterOccupiedSpace(maxColumnsInPageZerothSegment,
+                columnarFrame.getBuffer().capacity(), true, pageZeroWriterType);
         //Occupied space from previous writes
         requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize();
         //min and max tuples' sizes
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
index c3f8228..f01b28c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
@@ -119,6 +119,7 @@
     public int getColumnOffset(int columnIndex) throws HyracksDataException {
         // update the exception message.
         if (!columnPageZeroReader.isValidColumn(columnIndex)) {
+            printPageZeroReaderInfo();
             throw new IndexOutOfBoundsException(columnIndex + " >= " + getNumberOfColumns());
         }
         return columnPageZeroReader.getColumnOffset(columnIndex);
@@ -177,8 +178,8 @@
         throw new IllegalArgumentException("Use createTupleReference(int)");
     }
 
-    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
-        columnPageZeroReader.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
+    public int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+        return columnPageZeroReader.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
     }
 
     public BitSet getPageZeroSegmentsPages() {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
index d9cf11b..fa9b57a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
@@ -64,7 +64,7 @@
 
     ByteBuffer getPageZeroBuf();
 
-    void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs);
+    int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs);
 
     int getNumberOfPageZeroSegments();