[ASTERIXDB-3601][STO] Unpinning the not required segmentPages

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
In order to calculate the length/size of the columns,
all the columns' offset is to be known, hence need to
pin all the segment pages, irrespective of being asked
by the query. But, after calculating the length of columns
the segments are unpinned and the buffer is cleared.

Ext-ref: MB-66306
Change-Id: I915031e39a17ac853a24d78809b7742e2ea60163
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20016
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Tested-by: Ritik Raj <ritik.raj@couchbase.com>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 9815d0e..4910e82 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -491,7 +491,7 @@
       <id>asterix-gerrit-asterix-app</id>
       <properties>
         <test.excludes>
-          **/CloudStorageTest.java,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
+          **/CloudStorageTest.java,**/CloudStorageSparseTest,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
           **/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,
           **/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,
           **/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,
@@ -612,6 +612,7 @@
       <properties>
         <test.includes>
           **/CloudStorageTest.java,
+          **/CloudStorageSparseTest.java,
           **/CloudStorageCancellationTest.java,
           **/SqlppSinglePointLookupExecutionTest.java, **/AwsS3*.java
         </test.includes>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageSparseTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageSparseTest.java
new file mode 100644
index 0000000..b7a264b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageSparseTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.api.common.LocalCloudUtilAdobeMock;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+/**
+ * Run tests in cloud deployment environment
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageSparseTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final TestCaseContext tcCtx;
+    public static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+    public static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+    public static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage-sparse.conf";
+    public static final String DELTA_RESULT_PATH = "results_cloud";
+    public static final String EXCLUDED_TESTS = "MP";
+
+    public static final String PLAYGROUND_CONTAINER = "playground";
+    public static final String MOCK_SERVER_REGION = "us-west-2";
+    public static final int MOCK_SERVER_PORT = 8001;
+    public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
+
+    public CloudStorageSparseTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+        setupEnv(testExecutor);
+    }
+
+    public static void setupEnv(TestExecutor testExecutor) throws Exception {
+        LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
+        testExecutor.executorId = "cloud";
+        testExecutor.stripSubstring = "//DB:";
+        LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME);
+
+        // create the playground bucket and leave it empty, just for external collection-based tests
+        S3ClientBuilder builder = S3Client.builder();
+        URI endpoint = URI.create(MOCK_SERVER_HOSTNAME); // endpoint pointing to S3 mock server
+        builder.region(Region.of(MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create())
+                .endpointOverride(endpoint);
+        S3Client client = builder.build();
+        client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
+        client.close();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+        LocalCloudUtilAdobeMock.shutdownSilently();
+    }
+
+    @Parameters(name = "CloudStorageSparseTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+    }
+
+    @Test
+    public void test() throws Exception {
+        List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+        Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+        LangExecutionUtil.test(tcCtx);
+        IBufferCache bufferCache;
+        for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs) {
+            bufferCache = ((INcApplicationContext) nc.getApplicationContext()).getBufferCache();
+            Assert.assertTrue(((BufferCache) bufferCache).isClean());
+        }
+    }
+
+    private static String getText(Description description) {
+        return description == null ? "" : description.getValue();
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java
index d0823c7..6f19393 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java
@@ -28,7 +28,10 @@
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -68,6 +71,11 @@
     @Test
     public void test() throws Exception {
         LangExecutionUtil.test(tcCtx);
+        IBufferCache bufferCache;
+        for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs) {
+            bufferCache = ((INcApplicationContext) nc.getApplicationContext()).getBufferCache();
+            Assert.assertTrue(((BufferCache) bufferCache).isClean());
+        }
     }
 
     private static void setNcEndpoints(TestExecutor testExecutor) {
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-sparse.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-sparse.conf
new file mode 100644
index 0000000..0842e01
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-sparse.conf
@@ -0,0 +1,77 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements.  See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership.  The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License.  You may obtain a copy of the License at
+;
+;   http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied.  See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=target/tmp/asterix_nc1/iodevice1
+iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
+nc.api.port=19004
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=target/tmp/asterix_nc2/txnlog
+core.dump.dir=target/tmp/asterix_nc2/coredump
+iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+nc.api.port=19005
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
+
+[nc]
+credential.file=src/test/resources/security/passwd
+python.cmd.autolocate=true
+python.env=FOO=BAR=BAZ,BAR=BAZ
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m --add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.size=128MB
+storage.memorycomponent.globalbudget=512MB
+storage.max.columns.in.zeroth.segment=800
+storage.page.zero.writer=sparse
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+credential.file=src/test/resources/security/passwd
+
+[common]
+log.dir = logs/
+log.level = INFO
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
+compiler.ordered.fields=false
+compiler.internal.sanitycheck=true
+messaging.frame.size=4096
+messaging.frame.count=512
+cloud.deployment=true
+storage.buffercache.pagesize=32KB
+storage.partitioning=static
+cloud.storage.scheme=s3
+cloud.storage.bucket=cloud-storage-container
+cloud.storage.region=us-west-2
+cloud.storage.endpoint=http://127.0.0.1:8001
+cloud.storage.anonymous.auth=true
+cloud.storage.cache.policy=selective
+cloud.max.write.requests.per.second=2000
+cloud.max.read.requests.per.second=4000
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
index 551d2c4..b3d1e2d 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
@@ -42,6 +42,7 @@
 storage.buffercache.size=128MB
 storage.memorycomponent.globalbudget=512MB
 storage.max.columns.in.zeroth.segment=800
+storage.page.zero.writer=default
 
 [cc]
 address = 127.0.0.1
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
index 4a59073..1ab3052 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
@@ -34,9 +34,12 @@
 
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
 
 public final class MultiPageZeroByteBuffersReader {
     private static final ByteBuffer EMPTY;
+    private final IntList notRequiredSegmentsIndexes;
     private ColumnMultiPageZeroBufferProvider bufferProvider;
     private final Int2IntMap segmentDir; // should I just create a buffer[numberOfSegments] instead?
     private int maxBuffersSize;
@@ -51,15 +54,15 @@
     public MultiPageZeroByteBuffersReader() {
         this.buffers = new ArrayList<>();
         segmentDir = new Int2IntOpenHashMap();
+        notRequiredSegmentsIndexes = new IntArrayList();
         segmentDir.defaultReturnValue(-1);
     }
 
-    public void reset(IColumnBufferProvider bufferProvider) throws HyracksDataException {
-        ColumnMultiPageZeroBufferProvider pageZeroBufferProvider = (ColumnMultiPageZeroBufferProvider) bufferProvider;
+    public void reset(IColumnBufferProvider pageZeroBufferProvider) throws HyracksDataException {
         reset();
-        this.bufferProvider = pageZeroBufferProvider;
-        maxBuffersSize = pageZeroBufferProvider.getNumberOfRemainingPages();
-        pageZeroBufferProvider.readAll(buffers, segmentDir);
+        this.bufferProvider = (ColumnMultiPageZeroBufferProvider) pageZeroBufferProvider;
+        maxBuffersSize = bufferProvider.getNumberOfRemainingPages();
+        bufferProvider.readAll(buffers, segmentDir);
     }
 
     public void read(int segmentIndex, IPointable pointable, int position, int length)
@@ -147,6 +150,32 @@
         }
     }
 
+    public void unPinNotRequiredSegments(BitSet pageZeroSegmentsPages, int numberOfPageZeroSegments)
+            throws HyracksDataException {
+        if (numberOfPageZeroSegments <= 1) {
+            // If there is only one segment, it is always pinned.
+            // So no need to unpin the segments.
+            return;
+        }
+        notRequiredSegmentsIndexes.clear();
+        // Start checking from index 1 (0th segment is always pinned)
+        int i = pageZeroSegmentsPages.nextClearBit(1);
+        while (i >= 1 && i < numberOfPageZeroSegments) {
+            int segmentIndex = i - 1; // Adjusted index for segmentDir
+
+            int bufferIndex = segmentDir.get(segmentIndex);
+            if (bufferIndex != -1) {
+                buffers.set(bufferIndex, EMPTY);
+                notRequiredSegmentsIndexes.add(bufferIndex);
+                segmentDir.remove(segmentIndex);
+            }
+
+            i = pageZeroSegmentsPages.nextClearBit(i + 1);
+        }
+        // Unpin the buffers that are not required anymore.
+        bufferProvider.releasePages(notRequiredSegmentsIndexes);
+    }
+
     public int findColumnIndexInSegment(int segmentIndex, int columnIndex, int numberOfColumnsInSegment)
             throws HyracksDataException {
         if (segmentIndex < 0 || segmentIndex >= maxBuffersSize) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
index 831e459..5deead4 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
@@ -50,7 +50,7 @@
  * space requirements of each approach.
  */
 public class PageZeroWriterFlavorSelector implements IColumnPageZeroWriterFlavorSelector {
-    protected byte writerFlag = IColumnPageZeroWriter.ColumnPageZeroWriterType.ADAPTIVE.getWriterFlag();
+    protected byte writerFlag = IColumnPageZeroWriter.ColumnPageZeroWriterType.DEFAULT.getWriterFlag();
 
     // Cache of writer instances to avoid repeated object creation
     private final Byte2ObjectArrayMap<IColumnPageZeroWriter> writers;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
index 3f729b8..d756a6c 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
@@ -30,6 +30,7 @@
 import java.util.BitSet;
 
 import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
 import org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
@@ -43,12 +44,11 @@
     protected static Logger LOGGER = LogManager.getLogger();
 
     protected ByteBuffer pageZeroBuf;
-    protected BitSet pageZeroSegmentsPages;
+    protected static final BitSet EMPTY_SEGMENTS = new BitSet();
     protected int numberOfPresentColumns;
     protected int headerSize;
 
     public DefaultColumnPageZeroReader() {
-        this.pageZeroSegmentsPages = new BitSet();
     }
 
     @Override
@@ -172,7 +172,7 @@
 
     @Override
     public BitSet getPageZeroSegmentsPages() {
-        return pageZeroSegmentsPages;
+        return EMPTY_SEGMENTS;
     }
 
     @Override
@@ -187,9 +187,12 @@
 
     @Override
     public BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll) {
-        pageZeroSegmentsPages.clear();
-        pageZeroSegmentsPages.set(0);
-        return pageZeroSegmentsPages;
+        return EMPTY_SEGMENTS;
+    }
+
+    @Override
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException {
+        // No-OP
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
index d29daa7..d4ffbb4 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
@@ -60,8 +60,8 @@
         super();
         zerothSegmentReader = new DefaultColumnPageZeroReader();
         this.pageZeroSegmentsPages = new BitSet();
-        this.maxNumberOfColumnsInAPage = bufferCapacity
-                / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        this.maxNumberOfColumnsInAPage =
+                DefaultColumnMultiPageZeroWriter.getMaximumNumberOfColumnsInAPage(bufferCapacity);
         this.offsetPointable = new VoidPointable();
     }
 
@@ -256,7 +256,7 @@
         // Not marking the zeroth segment
         if (numberOfPageZeroSegments == 1 || markAll) {
             // mark all segments as required
-            pageZeroSegmentsPages.set(0, numberOfPageZeroSegments);
+            pageZeroSegmentsPages.set(1, numberOfPageZeroSegments);
         } else {
             // Iterate over the projected columns and mark the segments that contain them
             int currentIndex = projectedColumns.nextSetBit(zerothSegmentMaxColumns);
@@ -279,6 +279,11 @@
     }
 
     @Override
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException {
+        segmentBuffers.unPinNotRequiredSegments(pageZeroSegmentsPages, numberOfPageZeroSegments);
+    }
+
+    @Override
     public int getHeaderSize() {
         return EXTENDED_HEADER_SIZE;
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
index 7f16d5e..5c7d383 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
@@ -95,8 +95,7 @@
         segments = new MultiPersistentPageZeroBufferBytesOutputStream(multiPageOpRef); // should this be populated at reset?
         this.zerothSegmentWriter = new DefaultColumnPageZeroWriter();
         this.zerothSegmentMaxColumns = zerothSegmentMaxColumns;
-        this.maximumNumberOfColumnsInAPage = bufferCapacity
-                / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        this.maximumNumberOfColumnsInAPage = getMaximumNumberOfColumnsInAPage(bufferCapacity);
     }
 
     @Override
@@ -263,4 +262,9 @@
     public int getHeaderSize() {
         return EXTENDED_HEADER_SIZE;
     }
+
+    public static int getMaximumNumberOfColumnsInAPage(int bufferCapacity) {
+        return bufferCapacity
+                / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
index 608ff71..035db5d 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
@@ -64,8 +64,8 @@
         super();
         zerothSegmentReader = new SparseColumnPageZeroReader();
         this.pageZeroSegmentsPages = new BitSet();
-        this.maxNumberOfColumnsInAPage = bufferCapacity
-                / (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
+        this.maxNumberOfColumnsInAPage =
+                SparseColumnMultiPageZeroWriter.getMaximumNumberOfColumnsInAPage(bufferCapacity);
         this.offsetPointable = new VoidPointable();
         this.columnIndexToRelativeColumnIndex = new Int2IntOpenHashMap();
         columnIndexToRelativeColumnIndex.defaultReturnValue(-1);
@@ -346,7 +346,7 @@
         // Not marking the zeroth segment
         if (numberOfPageZeroSegments == 1 || markAll) {
             // mark all segments as required
-            pageZeroSegmentsPages.set(0, numberOfPageZeroSegments);
+            pageZeroSegmentsPages.set(1, numberOfPageZeroSegments);
         } else {
             // Iterate over the projected columns and mark the segments that contain them
             int currentIndex = projectedColumns.nextSetBit(maxColumnIndexInZerothSegment + 1);
@@ -378,6 +378,11 @@
     }
 
     @Override
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException {
+        segmentBuffers.unPinNotRequiredSegments(pageZeroSegmentsPages, numberOfPageZeroSegments);
+    }
+
+    @Override
     public void printPageZeroReaderInfo() {
         ColumnarValueException ex = new ColumnarValueException();
         ObjectNode readerNode = ex.createNode(getClass().getSimpleName());
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index ba3cda4..87b35bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -107,64 +107,69 @@
      */
     public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet requestedColumns, BitSet evictableColumns,
             BitSet cloudOnlyColumns) throws HyracksDataException {
-        // Set leafFrame
-        this.leafFrame = leafFrame;
-        // Ensure arrays capacities (given the leafFrame's columns and pages)
-        init();
+        try {
+            // Set leafFrame
+            this.leafFrame = leafFrame;
+            // Ensure arrays capacities (given the leafFrame's columns and pages)
+            init();
 
-        // Set the first 32-bits to the offset and the second 32-bits to columnIndex
-        int numberOfPresentColumnsInLeaf = leafFrame.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
+            // Set the first 32-bits to the offset and the second 32-bits to columnIndex
+            int numberOfPresentColumnsInLeaf = leafFrame.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
 
-        // Set artificial offset to determine the last column's length
-        int megaLeafLength = leafFrame.getMegaLeafNodeLengthInBytes();
-        offsetColumnIndexPairs[numberOfPresentColumnsInLeaf] =
-                IntPairUtil.of(megaLeafLength, numberOfPresentColumnsInLeaf);
+            // Set artificial offset to determine the last column's length
+            int megaLeafLength = leafFrame.getMegaLeafNodeLengthInBytes();
+            offsetColumnIndexPairs[numberOfPresentColumnsInLeaf] =
+                    IntPairUtil.of(megaLeafLength, numberOfPresentColumnsInLeaf);
 
-        // Sort the pairs by offset (i.e., lowest offset first)
-        LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfPresentColumnsInLeaf, OFFSET_COMPARATOR);
+            // Sort the pairs by offset (i.e., lowest offset first)
+            LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfPresentColumnsInLeaf, OFFSET_COMPARATOR);
 
-        int columnOrdinal = 0;
-        for (int i = 0; i < numberOfPresentColumnsInLeaf; i++) {
-            if (offsetColumnIndexPairs[i] == 0) {
-                //Any requested column's offset can't be zero
-                //In case a column is not being present in the accessed pageZero segments, it will be defaulted to 0
-                continue;
-            }
-            int columnIndex = getColumnIndexFromPair(offsetColumnIndexPairs[i]);
-            int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
-            int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
+            int columnOrdinal = 0;
+            for (int i = 0; i < numberOfPresentColumnsInLeaf; i++) {
+                if (offsetColumnIndexPairs[i] == 0) {
+                    //Any requested column's offset can't be zero
+                    //In case a column is not being present in the accessed pageZero segments, it will be defaulted to 0
+                    continue;
+                }
+                int columnIndex = getColumnIndexFromPair(offsetColumnIndexPairs[i]);
+                int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
+                int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
 
-            // Compute the column's length in bytes (set 0 for PKs)
-            int length = columnIndex < numberOfPrimaryKeys ? 0 : nextOffset - offset;
-            // In case of sparse columns, few columnIndexes can be greater than the total sparse column count.
-            ensureCapacity(columnIndex);
-            lengths[columnIndex] = length;
+                // Compute the column's length in bytes (set 0 for PKs)
+                int length = columnIndex < numberOfPrimaryKeys ? 0 : nextOffset - offset;
+                // In case of sparse columns, few columnIndexes can be greater than the total sparse column count.
+                ensureCapacity(columnIndex);
+                lengths[columnIndex] = length;
 
-            // Get start page ID (given the computed length above)
-            int startPageId = getColumnStartPageIndex(columnIndex);
-            // Get the number of pages (given the computed length above)
-            int numberOfPages = getColumnNumberOfPages(columnIndex);
+                // Get start page ID (given the computed length above)
+                int startPageId = getColumnStartPageIndex(columnIndex);
+                // Get the number of pages (given the computed length above)
+                int numberOfPages = getColumnNumberOfPages(columnIndex);
 
-            if (columnIndex >= numberOfPrimaryKeys && requestedColumns.get(columnIndex)) {
-                // Set column index
-                columnsOrder[columnOrdinal++] = columnIndex;
-                // Compute cloud-only and evictable pages
-                setCloudOnlyAndEvictablePages(columnIndex, cloudOnlyColumns, evictableColumns, startPageId,
-                        numberOfPages);
-                // A requested column. Keep its pages as requested
-                continue;
+                if (columnIndex >= numberOfPrimaryKeys && requestedColumns.get(columnIndex)) {
+                    // Set column index
+                    columnsOrder[columnOrdinal++] = columnIndex;
+                    // Compute cloud-only and evictable pages
+                    setCloudOnlyAndEvictablePages(columnIndex, cloudOnlyColumns, evictableColumns, startPageId,
+                            numberOfPages);
+                    // A requested column. Keep its pages as requested
+                    continue;
+                }
+
+                // Mark the page as non-evictable
+                for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+                    nonEvictablePages.set(j);
+                }
             }
 
-            // Mark the page as non-evictable
-            for (int j = startPageId; j < startPageId + numberOfPages; j++) {
-                nonEvictablePages.set(j);
-            }
+            // Bound the nonRequestedPages to the number of pages in the mega leaf node
+            nonEvictablePages.set(leafFrame.getMegaLeafNodeNumberOfPages());
+            // to indicate the end
+            columnsOrder[columnOrdinal] = -1;
+        } finally {
+            //Unpin the not required segment pages
+            leafFrame.unPinNotRequiredPageZeroSegments();
         }
-
-        // Bound the nonRequestedPages to the number of pages in the mega leaf node
-        nonEvictablePages.set(leafFrame.getMegaLeafNodeNumberOfPages());
-        // to indicate the end
-        columnsOrder[columnOrdinal] = -1;
     }
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 5458807..181aa06 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -138,19 +138,17 @@
         }
 
         // pin the required page segments
-        mergedPageRanges.clear();
+        //        mergedPageRanges.clear();
         int pageZeroId = leafFrame.getPageId();
-        // Pinning all the segments of the page zero for now,
+        // Pinning all the segments of the page zero
         // as the column eviction logic is based on the length of the columns which
         // gets evaluated from the page zero segments.
-
-        //TODO: find a way to pin only the segments that are required for the operation
-        // or pin all the segments and then unpin the segments that are not required
-        boolean markAll = true || operation == MERGE;
-        BitSet pageZeroSegmentRanges = leafFrame.markRequiredPageZeroSegments(projectedColumns, pageZeroId, markAll);
-        // Merge the page zero segments ranges
-        mergePageZeroSegmentRanges(pageZeroSegmentRanges);
-        mergedPageRanges.pin(columnCtx, bufferCache, fileId, pageZeroId);
+        BitSet pageZeroSegmentRanges =
+                leafFrame.markRequiredPageZeroSegments(projectedColumns, pageZeroId, operation == MERGE);
+        // will unpin the non-required segments after columnRanges.reset()
+        // can we do lazily?
+        int numberOfPageZeroSegments = leafFrame.getNumberOfPageZeroSegments();
+        pinAll(fileId, pageZeroId, numberOfPageZeroSegments - 1, bufferCache);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
index f01b28c..3307423 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
@@ -190,6 +190,10 @@
         return columnPageZeroReader.markRequiredPageSegments(projectedColumns, pageZeroId, markAll);
     }
 
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException {
+        columnPageZeroReader.unPinNotRequiredPageZeroSegments();
+    }
+
     public void printPageZeroReaderInfo() {
         columnPageZeroReader.printPageZeroReaderInfo();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
index fa9b57a..f57eba3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
@@ -77,4 +77,6 @@
     BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll);
 
     void printPageZeroReaderInfo();
+
+    void unPinNotRequiredPageZeroSegments() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
index 1556bea..c60ca79 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.IntList;
 import it.unimi.dsi.fastutil.longs.LongSet;
 
 public class ColumnMultiPageZeroBufferProvider implements IColumnBufferProvider {
@@ -75,14 +76,13 @@
             // will do on request basis? or prefetch all the segments?
             return;
         }
-        // traverse the set segments and read the pages
-        int currentIndex = pageZeroSegmentsPages.nextSetBit(0);
-        while (currentIndex != -1) {
-            int segmentIndex = currentIndex - 1; // segmentIndex starts from 1
+        //Since all the pageSegments are pinned for calculating the lengths of the columns,
+        //read all the segments and store them in the buffers list.
+        //after ColumnRanges.reset(), unpin the segments that are not required.
+        for (int segmentIndex = 0; segmentIndex < numberOfRemainingPages; segmentIndex++) {
             ByteBuffer buffer = read(segmentIndex);
             segmentDir.put(segmentIndex, buffers.size());
             buffers.add(buffer);
-            currentIndex = pageZeroSegmentsPages.nextSetBit(currentIndex + 1);
         }
     }
 
@@ -97,11 +97,41 @@
     @Override
     public void releaseAll() throws HyracksDataException {
         for (ICachedPage page : pages) {
-            multiPageOp.unpin(page);
+            if (page != null) {
+                multiPageOp.unpin(page);
+            }
         }
         pages.clear();
     }
 
+    public void releasePages(IntList notRequiredSegmentsIndexes) throws HyracksDataException {
+        //From the list of cached pages, remove those pages.
+        //Pages and buffers list are in sync, so we can use the same indexes.
+        Throwable th = null;
+        for (int pageIndex : notRequiredSegmentsIndexes) {
+            if (pageIndex < 0 || pageIndex >= pages.size()) {
+                throw new IndexOutOfBoundsException("Page index out of bounds: " + pageIndex);
+            }
+            try {
+                ICachedPage page = pages.get(pageIndex);
+                if (page != null) {
+                    multiPageOp.unpin(page);
+                    pinnedPages.remove(((CachedPage) page).getDiskPageId());
+                    pages.set(pageIndex, null); // Clear the reference
+                }
+            } catch (Exception e) {
+                if (th == null) {
+                    th = e;
+                } else {
+                    th.addSuppressed(e);
+                }
+            }
+        }
+        if (th != null) {
+            throw HyracksDataException.create(th);
+        }
+    }
+
     @Override
     public ByteBuffer getBuffer() {
         throw new UnsupportedOperationException("getBuffer() is not supported for multi-page zero buffer provider.");