[ASTERIXDB-3421][STO] Multiple fixes in cloud disk caching

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Metadata partition should not be evicted
- Buffer cache read context should call onPin(ICachedPage) in a synchronized block on the pinning page
- Written pages should contain the compressed page size and offset
- Multiple issues when calculating page IDs of the requested columns

Change-Id: Ic94cc1e63ee4618b18c6d4c6e3e74101a7753400
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18352
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
index 0f9a4de..3c0f1df 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -28,6 +28,7 @@
 import org.apache.asterix.common.cloud.IPartitionBootstrapper;
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,6 +38,7 @@
 import org.apache.hyracks.cloud.cache.service.CloudDiskCacheMonitoringAndPrefetchingService;
 import org.apache.hyracks.cloud.cache.service.CloudDiskResourceCacheLockNotifier;
 import org.apache.hyracks.cloud.cache.service.DiskCacheSweeperThread;
+import org.apache.hyracks.cloud.cache.service.IEvictableLocalResourceFilter;
 import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.io.IOManager;
@@ -55,6 +57,8 @@
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 public final class CloudConfigurator {
+    private static final IEvictableLocalResourceFilter FILTER =
+            (x -> StoragePathUtil.getPartitionNumFromRelativePath(x.getPath()) != StorageConstants.METADATA_PARTITION);
     private final CloudProperties cloudProperties;
     private final IOManager localIoManager;
     private final AbstractCloudIOManager cloudIOManager;
@@ -157,7 +161,7 @@
 
     private static IDiskResourceCacheLockNotifier createLockNotifier(boolean diskCacheManagerRequired) {
         if (diskCacheManagerRequired) {
-            return new CloudDiskResourceCacheLockNotifier(StorageConstants.METADATA_PARTITION);
+            return new CloudDiskResourceCacheLockNotifier(FILTER);
         }
 
         return NoOpDiskResourceCacheLockNotifier.INSTANCE;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index f395362..97169db 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -120,7 +120,7 @@
     public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
-        long readTo = offset + buffer.remaining();
+        long readTo = offset + buffer.remaining() - 1;
         GetObjectRequest rangeGetObjectRequest =
                 GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
 
@@ -163,7 +163,7 @@
     public InputStream getObjectStream(String bucket, String path, long offset, long length) {
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
-        long readTo = offset + length;
+        long readTo = offset + length - 1;
         GetObjectRequest getReq =
                 GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
         try {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
index 218015d..d8aecc1 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
@@ -48,6 +48,10 @@
 
     @Override
     public void doEvict(FileReference directory) throws HyracksDataException {
+        if (!localIoManager.exists(directory)) {
+            return;
+        }
+
         if (!directory.getFile().isDirectory()) {
             throw new IllegalStateException(directory + " is not a directory");
         }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 3e0fbd4..43b5d1b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -123,11 +123,11 @@
         int did = getDIDfromResourcePath(resourcePath);
         LocalResource resource = resourceRepository.get(resourcePath);
         DatasetResource datasetResource = datasets.get(did);
+        lockNotifier.onRegister(resource, index);
         if (datasetResource == null) {
             datasetResource = getDatasetLifecycle(did);
         }
         datasetResource.register(resource, (ILSMIndex) index);
-        lockNotifier.onRegister(resource, index, datasetResource.getIndexInfo(resource.getId()).getPartition());
     }
 
     private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
index 036a812..fc55c7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
@@ -38,14 +38,14 @@
 
 public final class CloudDiskResourceCacheLockNotifier implements IDiskResourceCacheLockNotifier {
     private static final Logger LOGGER = LogManager.getLogger();
-    private final int metadataPartition;
+    private final IEvictableLocalResourceFilter filter;
     private final Long2ObjectMap<LocalResource> inactiveResources;
     private final Long2ObjectMap<UnsweepableIndexUnit> unsweepableIndexes;
     private final Long2ObjectMap<SweepableIndexUnit> sweepableIndexes;
     private final ReentrantReadWriteLock evictionLock;
 
-    public CloudDiskResourceCacheLockNotifier(int metadataPartition) {
-        this.metadataPartition = metadataPartition;
+    public CloudDiskResourceCacheLockNotifier(IEvictableLocalResourceFilter filter) {
+        this.filter = filter;
         inactiveResources = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
         unsweepableIndexes = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
         sweepableIndexes = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
@@ -53,19 +53,19 @@
     }
 
     @Override
-    public void onRegister(LocalResource localResource, IIndex index, int partition) {
+    public void onRegister(LocalResource localResource, IIndex index) {
         ILSMIndex lsmIndex = (ILSMIndex) index;
         evictionLock.readLock().lock();
         try {
-            if (partition != metadataPartition) {
+            if (filter.accept(localResource)) {
                 long resourceId = localResource.getId();
                 if (lsmIndex.getDiskCacheManager().isSweepable()) {
                     sweepableIndexes.put(resourceId, new SweepableIndexUnit(localResource, lsmIndex));
                 } else {
                     unsweepableIndexes.put(resourceId, new UnsweepableIndexUnit(localResource));
                 }
+                inactiveResources.remove(localResource.getId());
             }
-            inactiveResources.remove(localResource.getId());
         } finally {
             evictionLock.readLock().unlock();
         }
@@ -75,7 +75,7 @@
     public void onUnregister(long resourceId) {
         evictionLock.readLock().lock();
         try {
-            AbstractIndexUnit indexUnit = getUnit(resourceId);
+            AbstractIndexUnit indexUnit = removeUnit(resourceId);
             if (indexUnit != null) {
                 indexUnit.drop();
             } else {
@@ -86,14 +86,6 @@
         }
     }
 
-    private AbstractIndexUnit getUnit(long resourceId) {
-        AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId);
-        if (indexUnit == null) {
-            indexUnit = unsweepableIndexes.get(resourceId);
-        }
-        return indexUnit;
-    }
-
     @Override
     public void onOpen(long resourceId) {
         evictionLock.readLock().lock();
@@ -122,7 +114,22 @@
         } finally {
             evictionLock.readLock().unlock();
         }
+    }
 
+    private AbstractIndexUnit getUnit(long resourceId) {
+        AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId);
+        if (indexUnit == null) {
+            indexUnit = unsweepableIndexes.get(resourceId);
+        }
+        return indexUnit;
+    }
+
+    private AbstractIndexUnit removeUnit(long resourceId) {
+        AbstractIndexUnit indexUnit = sweepableIndexes.remove(resourceId);
+        if (indexUnit == null) {
+            indexUnit = unsweepableIndexes.remove(resourceId);
+        }
+        return indexUnit;
     }
 
     ReentrantReadWriteLock getEvictionLock() {
@@ -133,7 +140,8 @@
         inactiveResources.clear();
         // First check whatever we had already
         for (LocalResource lr : localResources.values()) {
-            if (unsweepableIndexes.containsKey(lr.getId()) || sweepableIndexes.containsKey(lr.getId())) {
+            if (!filter.accept(lr) || unsweepableIndexes.containsKey(lr.getId())
+                    || sweepableIndexes.containsKey(lr.getId())) {
                 // We already have this resource
                 continue;
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
index f594e06..e466758 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
@@ -31,7 +31,6 @@
 import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
-import org.apache.hyracks.cloud.sweeper.ISweeper;
 import org.apache.hyracks.cloud.sweeper.Sweeper;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
@@ -48,7 +47,7 @@
     private final IPhysicalDrive physicalDrive;
     private final List<SweepableIndexUnit> indexes;
     private final ICloudIOManager cloudIOManager;
-    private final ISweeper sweeper;
+    private final Sweeper sweeper;
     private final long inactiveTimeThreshold;
 
     public DiskCacheSweeperThread(ExecutorService executorService, long waitTime,
@@ -161,7 +160,7 @@
     }
 
     @CriticalPath
-    private static void sweepIndexes(ISweeper sweeper, List<SweepableIndexUnit> indexes) {
+    private static void sweepIndexes(Sweeper sweeper, List<SweepableIndexUnit> indexes) {
         for (int i = 0; i < indexes.size(); i++) {
             SweepableIndexUnit index = indexes.get(i);
             if (!index.isSweeping()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java
similarity index 67%
rename from hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
rename to hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java
index 9067a3f..3712918 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java
@@ -16,20 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.cloud.sweeper;
+package org.apache.hyracks.cloud.cache.service;
 
-import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
-import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.LocalResource;
 
-/**
- * Sweeps an index to relieve the pressure on a local {@link IPhysicalDrive}
- */
 @FunctionalInterface
-public interface ISweeper {
+public interface IEvictableLocalResourceFilter {
     /**
-     * Sweep an index
+     * Whether a local resource is evictable
      *
-     * @param indexUnit to sweep
+     * @param resource resource to test
+     * @return true if it is cacheable, false otherwise
      */
-    void sweep(SweepableIndexUnit indexUnit) throws InterruptedException;
+    boolean accept(LocalResource resource);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
deleted file mode 100644
index ca103ab..0000000
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.cloud.sweeper;
-
-import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
-
-public final class NoOpSweeper implements ISweeper {
-    public static final ISweeper INSTANCE = new NoOpSweeper();
-
-    private NoOpSweeper() {
-    }
-
-    @Override
-    public void sweep(SweepableIndexUnit indexUnit) {
-        // NoOp
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
index 245c957..6e12b79 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -35,7 +35,7 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-public final class Sweeper implements ISweeper {
+public final class Sweeper {
     private static final Logger LOGGER = LogManager.getLogger();
     private static final SweepRequest POISON = new SweepRequest();
     private final BlockingQueue<SweepRequest> requests;
@@ -55,7 +55,6 @@
         }
     }
 
-    @Override
     public void sweep(SweepableIndexUnit indexUnit) throws InterruptedException {
         SweepRequest request = freeRequests.take();
         request.reset(indexUnit);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index 9fc60fa..5c6fe09 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -20,7 +20,8 @@
 
 import static org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeperUtil.EMPTY;
 import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnPageIndex;
-import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnStartOffset;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfRemainingPages;
 
 import java.util.BitSet;
 
@@ -171,15 +172,18 @@
     }
 
     /**
-     * Length of a column in pages
+     * The number of pages the column occupies
      *
      * @param columnIndex column index
      * @return number of pages
      */
     public int getColumnNumberOfPages(int columnIndex) {
         int pageSize = leafFrame.getBuffer().capacity();
-        int numberOfPages = getNumberOfPages(getColumnLength(columnIndex), pageSize);
-        return numberOfPages == 0 ? 1 : numberOfPages;
+        int offset = getColumnStartOffset(leafFrame.getColumnOffset(columnIndex), pageSize);
+        int firstBufferLength = pageSize - offset;
+        int remainingLength = getColumnLength(columnIndex) - firstBufferLength;
+        // 1 for the first page + the number of remaining pages
+        return 1 + getNumberOfRemainingPages(remainingLength, pageSize);
     }
 
     /**
@@ -231,6 +235,10 @@
         return columnsOrder;
     }
 
+    public int getTotalNumberOfPages() {
+        return leafFrame.getMegaLeafNodeNumberOfPages();
+    }
+
     private void init() {
         int numberOfColumns = leafFrame.getNumberOfColumns();
         offsetColumnIndexPairs = LongArrays.ensureCapacity(offsetColumnIndexPairs, numberOfColumns + 1, 0);
@@ -312,5 +320,4 @@
         }
         builder.append('\n');
     }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 2d6b2fd..7623698 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
 
 import static org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
 import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
 import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
 
@@ -45,9 +46,12 @@
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 @NotThreadSafe
 public final class CloudColumnReadContext implements IColumnReadContext {
+    private static final Logger LOGGER = LogManager.getLogger();
     private final ColumnProjectorType operation;
     private final IPhysicalDrive drive;
     private final BitSet plan;
@@ -130,10 +134,26 @@
             return;
         }
 
-        // TODO What if every other page is requested. That would do N/2 request, where N is the number of pages.
-        // TODO This should be optimized in a way that minimizes the number of requests
         columnRanges.reset(leafFrame, projectedColumns, plan, cloudOnlyColumns);
         int pageZeroId = leafFrame.getPageId();
+
+        if (operation == MERGE) {
+            pinAll(fileId, pageZeroId, leafFrame.getMegaLeafNodeNumberOfPages() - 1, bufferCache);
+        } else {
+            pinProjected(fileId, pageZeroId, bufferCache);
+        }
+    }
+
+    private void pinAll(int fileId, int pageZeroId, int numberOfPages, IBufferCache bufferCache)
+            throws HyracksDataException {
+        columnCtx.prepare(numberOfPages);
+        pin(bufferCache, fileId, pageZeroId, 1, numberOfPages);
+    }
+
+    private void pinProjected(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
+        // TODO What if every other page is requested. That would do N/2 request, where N is the number of pages.
+        // TODO This should be optimized in a way that minimizes the number of requests
+
         int[] columnsOrders = columnRanges.getColumnsOrder();
         int i = 0;
         int columnIndex = columnsOrders[i];
@@ -143,38 +163,52 @@
                 continue;
             }
 
-            int startPageId = columnRanges.getColumnStartPageIndex(columnIndex);
-            // Will increment the number pages if the next column's pages are contiguous to this column's pages
-            int numberOfPages = columnRanges.getColumnNumberOfPages(columnIndex);
+            int firstPageIdx = columnRanges.getColumnStartPageIndex(columnIndex);
+            // last page of the column
+            int lastPageIdx = firstPageIdx + columnRanges.getColumnNumberOfPages(columnIndex) - 1;
 
             // Advance to the next column to check if it has contiguous pages
             columnIndex = columnsOrders[++i];
             while (columnIndex > -1) {
+                int sharedPageCount = 0;
                 // Get the next column's start page ID
-                int nextStartPageId = columnRanges.getColumnStartPageIndex(columnIndex);
-                if (nextStartPageId > startPageId + numberOfPages + 1) {
-                    // The next startPageId is not contiguous, stop.
+                int nextStartPageIdx = columnRanges.getColumnStartPageIndex(columnIndex);
+                if (nextStartPageIdx > lastPageIdx + 1) {
+                    // The nextStartPageIdx is not contiguous, stop.
                     break;
+                } else if (nextStartPageIdx == lastPageIdx) {
+                    // A shared page
+                    sharedPageCount = 1;
                 }
 
-                // Last page of this column
-                int nextLastPage = nextStartPageId + columnRanges.getColumnNumberOfPages(columnIndex);
-                // The next column's pages are contiguous. Combine its ranges with the previous one.
-                numberOfPages = nextLastPage - startPageId;
+                lastPageIdx += columnRanges.getColumnNumberOfPages(columnIndex) - sharedPageCount;
                 // Advance to the next column
                 columnIndex = columnsOrders[++i];
             }
 
+            if (lastPageIdx >= columnRanges.getTotalNumberOfPages()) {
+                throw new IndexOutOfBoundsException("lastPageIdx=" + lastPageIdx + ">=" + "megaLeafNodePages="
+                        + columnRanges.getTotalNumberOfPages());
+            }
+
+            int numberOfPages = lastPageIdx - firstPageIdx + 1;
             columnCtx.prepare(numberOfPages);
-            pin(bufferCache, fileId, pageZeroId, startPageId, numberOfPages);
+            pin(bufferCache, fileId, pageZeroId, firstPageIdx, numberOfPages);
         }
     }
 
-    private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numOfRequestedPages)
+    private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages)
             throws HyracksDataException {
-        for (int i = start; i < start + numOfRequestedPages; i++) {
+        for (int i = start; i < start + numberOfPages; i++) {
             long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
-            pinnedPages.add(bufferCache.pin(dpid, columnCtx));
+            try {
+                pinnedPages.add(bufferCache.pin(dpid, columnCtx));
+            } catch (Throwable e) {
+                LOGGER.error("Error while pinning page number {} with number of pages {}. {}\n columnRanges:\n {}", i,
+                        numberOfPages, columnCtx, columnRanges);
+                throw e;
+            }
+
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index f1b2fd4..21d5ce7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
 
 import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
-import static org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
 import static org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
 
 import java.io.IOException;
@@ -49,10 +48,15 @@
     private final ColumnProjectorType operation;
     private final ColumnRanges columnRanges;
     private final IPhysicalDrive drive;
+
     private int numberOfContiguousPages;
     private int pageCounter;
     private InputStream gapStream;
 
+    // For debugging
+    private long streamOffset;
+    private long remainingStreamBytes;
+
     CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges columnRanges, IPhysicalDrive drive) {
         this.operation = operation;
         this.columnRanges = columnRanges;
@@ -76,7 +80,7 @@
              * up writing the bytes of this page in the position of another page. Therefore, we should skip the bytes
              * for this particular page to avoid placing the bytes of this page into another page's position.
              */
-            skipCloudBytes(cachedPage);
+            skipStreamIfOpened(cachedPage);
             pageCounter++;
         }
     }
@@ -102,8 +106,7 @@
         boolean empty = BufferCacheCloudReadContextUtil.isEmpty(header);
         int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
         boolean cloudOnly = columnRanges.isCloudOnly(pageId);
-        ByteBuffer buffer;
-        if (empty || cloudOnly || gapStream != null) {
+        if (empty || cloudOnly) {
             boolean evictable = columnRanges.isEvictable(pageId);
             /*
              * Persist iff the following conditions are satisfied:
@@ -118,26 +121,30 @@
              * 'cloudOnly' is true.
              */
             boolean persist = empty && !cloudOnly && !evictable && operation != MERGE && drive.hasSpace();
-            buffer = readFromStream(ioManager, fileHandle, header, cPage, persist);
-            buffer.position(RESERVED_HEADER_BYTES);
+            readFromStream(ioManager, fileHandle, header, cPage, persist);
         } else {
             /*
-             * Here we can find a page that is planned for eviction, but it has not being evicted yet
-             * (i.e., empty = false). This could happen if the cursor is at a point the sweeper hasn't
-             * reached yet (i.e., cloudOnly = false).
+             *  Here we can find a page that is planned for eviction, but it has not being evicted yet
+             *  (i.e., empty = false). This could happen if the cursor is at a point the sweeper hasn't
+             *  reached yet (i.e., cloudOnly = false). Thus, whatever is read from the disk is valid.
              */
-            buffer = DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
+            skipStreamIfOpened(cPage);
         }
 
         if (++pageCounter == numberOfContiguousPages) {
             close();
         }
 
-        return buffer;
+        // Finally process the header
+        return DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
     }
 
     void close() throws HyracksDataException {
         if (gapStream != null) {
+            if (remainingStreamBytes != 0) {
+                LOGGER.warn("Closed cloud stream with nonzero bytes = {}", remainingStreamBytes);
+            }
+
             try {
                 gapStream.close();
                 gapStream = null;
@@ -147,13 +154,14 @@
         }
     }
 
-    private ByteBuffer readFromStream(IOManager ioManager, BufferedFileHandle fileHandle,
-            BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException {
+    private void readFromStream(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage, boolean persist) throws HyracksDataException {
         InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
         ByteBuffer buffer = header.getBuffer();
         buffer.position(0);
+
         try {
-            while (buffer.remaining() != 0) {
+            while (buffer.remaining() > 0) {
                 int length = stream.read(buffer.array(), buffer.position(), buffer.remaining());
                 if (length < 0) {
                     throw new IllegalStateException("Stream should not be empty!");
@@ -164,6 +172,7 @@
             throw HyracksDataException.create(e);
         }
 
+        // Flip the buffer after reading to restore the correct position
         buffer.flip();
 
         if (persist) {
@@ -172,7 +181,8 @@
             BufferCacheCloudReadContextUtil.persist(cloudIOManager, fileHandle.getFileHandle(), buffer, offset);
         }
 
-        return buffer;
+        streamOffset += cPage.getCompressedPageSize();
+        remainingStreamBytes -= cPage.getCompressedPageSize();
     }
 
     private InputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage)
@@ -185,25 +195,31 @@
         long offset = cPage.getCompressedPageOffset();
         int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
         long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
+        remainingStreamBytes = length;
+        streamOffset = offset;
+        LOGGER.info(
+                "Cloud stream read for pageId={} starting from pageCounter={} out of "
+                        + "numberOfContiguousPages={} (streamOffset = {}, remainingStreamBytes = {})",
+                pageId, pageCounter, numberOfContiguousPages, streamOffset, remainingStreamBytes);
 
-        LOGGER.info("Cloud stream read for {} pages [{}, {}]", numberOfContiguousPages - pageCounter, pageId,
-                pageId + requiredNumOfPages);
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
         gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), offset, length);
 
         return gapStream;
     }
 
-    private void skipCloudBytes(CloudCachedPage cachedPage) throws HyracksDataException {
+    private void skipStreamIfOpened(CachedPage cPage) throws HyracksDataException {
         if (gapStream == null) {
             return;
         }
 
         try {
-            long remaining = cachedPage.getCompressedPageSize();
+            long remaining = cPage.getCompressedPageSize();
             while (remaining > 0) {
                 remaining -= gapStream.skip(remaining);
             }
+            streamOffset += cPage.getCompressedPageSize();
+            remainingStreamBytes -= cPage.getCompressedPageSize();
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
index 48633e7..d7b0764 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
@@ -19,7 +19,7 @@
 package org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples;
 
 import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnStartOffset;
-import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfRemainingPages;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -40,7 +40,7 @@
     private final IColumnReadMultiPageOp multiPageOp;
     private final Queue<ICachedPage> pages;
     private final LongSet pinnedPages;
-    private int numberOfPages;
+    private int numberOfRemainingPages;
     private int startPage;
     private int startOffset;
     private int length;
@@ -55,7 +55,7 @@
     @Override
     public void reset(ColumnBTreeReadLeafFrame frame) throws HyracksDataException {
         if (columnIndex >= frame.getNumberOfColumns()) {
-            numberOfPages = 0;
+            numberOfRemainingPages = 0;
             length = 0;
             return;
         }
@@ -70,8 +70,8 @@
         length = ColumnUtil.readColumnLength(firstPage, startOffset, pageSize);
         // Get the remaining length of the column
         int remainingLength = length - firstPage.remaining();
-        // Get the number of pages this column occupies
-        numberOfPages = getNumberOfPages(remainingLength, pageSize);
+        // Get the number of remaining pages this column occupies
+        numberOfRemainingPages = getNumberOfRemainingPages(remainingLength, pageSize);
         //+4-bytes after reading the length
         startOffset += Integer.BYTES;
         //-4-bytes after reading the length
@@ -84,12 +84,12 @@
         buffer.clear();
         buffer.position(startOffset);
         buffers.add(buffer);
-        for (int i = 0; i < numberOfPages; i++) {
+        for (int i = 0; i < numberOfRemainingPages; i++) {
             buffer = readNext().duplicate();
             buffer.clear();
             buffers.add(buffer);
         }
-        numberOfPages = 0;
+        numberOfRemainingPages = 0;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
index 12bb64e..fc1e460 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
@@ -87,7 +87,7 @@
         // Read the length of this column
         int length = firstPage.getInt();
         // Ensure the page limit to at most a full page
-        firstPage.limit(Math.min(length, pageSize));
+        firstPage.limit(Math.min(startOffset + length, pageSize));
         return length;
     }
 
@@ -98,7 +98,7 @@
      * @param pageSize        disk buffer cache page size
      * @return number of pages the column occupies
      */
-    public static int getNumberOfPages(int remainingLength, int pageSize) {
+    public static int getNumberOfRemainingPages(int remainingLength, int pageSize) {
         return (int) Math.ceil((double) remainingLength / pageSize);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
index 21cefb6..a0ad045 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -321,4 +321,9 @@
         final String path = fileHandle.getFileReference().getAbsolutePath();
         throw new IllegalStateException(String.format(ERROR_MESSAGE, op, expected, actual, path));
     }
+
+    @Override
+    public String toString() {
+        return fileHandle != null ? fileHandle.getFileReference().getAbsolutePath() : "";
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index ab37532..271afad 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -203,12 +203,11 @@
                 }
             }
 
-            // Notify context page is going to be pinned
-            context.onPin(cPage);
-
             // Resolve race of multiple threads trying to read the page from
             // disk.
             synchronized (cPage) {
+                // Notify context page is going to be pinned
+                context.onPin(cPage);
                 if (!cPage.valid) {
                     try {
                         tryRead(cPage, context);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 6fd18ff..b9d4e5a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -23,6 +23,8 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
 /**
  * @author yingyib
  */
@@ -209,4 +211,10 @@
     public int getCompressedPageSize() {
         return compressedSize;
     }
+
+    @Override
+    public String toString() {
+        return "CachedPage:[page:" + BufferedFileHandle.getPageId(dpid) + ", compressedPageOffset:" + compressedOffset
+                + ", compressedSize:" + compressedSize + "]";
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
index 1e986b3..6ce985e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
@@ -24,7 +24,6 @@
 /**
  * A proxy to notify a disk-cache (a faster disk that is caching a slower resource) about resource lifecycle events.
  * The notifier could block a resource from being operated on if the disk-cache manager denying access to a resource
- * TODO Do we care about dataset?
  */
 public interface IDiskResourceCacheLockNotifier {
     /**
@@ -33,9 +32,8 @@
      *
      * @param localResource resource to be registered
      * @param index         of the resource
-     * @param partition     partition
      */
-    void onRegister(LocalResource localResource, IIndex index, int partition);
+    void onRegister(LocalResource localResource, IIndex index);
 
     /**
      * Notify unregistering an existing resource
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
index b83c388..2c590cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
@@ -28,7 +28,7 @@
     }
 
     @Override
-    public void onRegister(LocalResource localResource, IIndex index, int partition) {
+    public void onRegister(LocalResource localResource, IIndex index) {
         // NoOp
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
index 970a0ae..ff2bd83 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
@@ -114,11 +114,13 @@
         final boolean contiguousLargePages = getPageId(cPage.getDiskPageId()) + 1 == extraBlockPageId;
         IFileHandle handle = getFileHandle();
         long bytesWritten;
+        long offset;
         try {
             buf.limit(contiguousLargePages ? bufferCache.getPageSize() * totalPages : bufferCache.getPageSize());
             buf.position(0);
             ByteBuffer[] buffers = header.prepareWrite(cPage);
-            bytesWritten = context.write(ioManager, handle, getFirstPageOffset(cPage), buffers);
+            offset = getFirstPageOffset(cPage);
+            bytesWritten = context.write(ioManager, handle, offset, buffers);
         } finally {
             returnHeaderHelper(header);
         }
@@ -130,6 +132,9 @@
 
         final int expectedWritten = bufferCache.getPageSizeWithHeader() + bufferCache.getPageSize() * (totalPages - 1);
         verifyBytesWritten(expectedWritten, bytesWritten);
+
+        cPage.setCompressedPageOffset(offset);
+        cPage.setCompressedPageSize((int) bytesWritten);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 6fe3846..cd882b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -114,15 +114,16 @@
             } else {
                 uBuffer.position(0);
             }
+            long offset;
             if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) {
                 cBuffer.position(0);
-                final long offset = compressedFileManager.writePageInfo(pageId, cBuffer.remaining());
+                offset = compressedFileManager.writePageInfo(pageId, cBuffer.remaining());
                 expectedBytesWritten = cBuffer.limit();
                 bytesWritten = context.write(ioManager, handle, offset, cBuffer);
             } else {
                 //Compression did not gain any savings
                 final ByteBuffer[] buffers = header.prepareWrite(cPage);
-                final long offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader());
+                offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader());
                 expectedBytesWritten = buffers[0].limit() + (long) buffers[1].limit();
                 bytesWritten = context.write(ioManager, handle, offset, buffers);
             }
@@ -134,6 +135,9 @@
                 writeExtraCompressedPages(cPage, cBuffer, totalPages, extraBlockPageId);
             }
 
+            cPage.setCompressedPageOffset(offset);
+            cPage.setCompressedPageSize((int) bytesWritten);
+
         } finally {
             returnHeaderHelper(header);
         }