[MULTIPLE ISSUES][STO] Multiple fixes for cloud dep.

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

ASTERIXDB-3436
- Make certain cloud request uninterruptible
- Retry on request failure

ASTERIXDB-3443
- Fix MergedPagesRanges out of bound exception

Change-Id: Ia8c34d4ba7a3527fea22149e5065815095c39ab2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18378
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageUnstableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageUnstableTest.java
new file mode 100644
index 0000000..a517ed4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageUnstableTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.api.common.LocalCloudUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Run tests in cloud deployment environment with simulated unstable connection
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageUnstableTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final TestCaseContext tcCtx;
+    private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+    private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+    private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf";
+    private static final String DELTA_RESULT_PATH = "results_cloud";
+    private static final String EXCLUDED_TESTS = "MP";
+
+    public CloudStorageUnstableTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.setProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE, "true");
+        LocalCloudUtil.startS3CloudEnvironment(true);
+        TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+        testExecutor.executorId = "cloud";
+        testExecutor.stripSubstring = "//DB:";
+        LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        System.clearProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE);
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "CloudStorageUnstableTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        long seed = System.nanoTime();
+        Random random = new Random(seed);
+        LOGGER.info("CloudStorageUnstableTest seed {}", seed);
+        Collection<Object[]> tests = LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+        List<Object[]> selected = new ArrayList<>();
+        for (Object[] test : tests) {
+            // Select 10% of the tests randomly
+            if (random.nextInt(10) == 0) {
+                selected.add(test);
+            }
+        }
+        return selected;
+    }
+
+    @Test
+    public void test() throws Exception {
+        List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+        Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+        LangExecutionUtil.test(tcCtx);
+    }
+
+    private static String getText(Description description) {
+        return description == null ? "" : description.getValue();
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 35b0744..91c24e8 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -51,6 +51,10 @@
 import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -173,12 +177,30 @@
 
     @Override
     public final void cloudRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
-        cloudClient.read(bucket, fHandle.getFileReference().getRelativePath(), offset, data);
+        int position = data.position();
+        ICloudRequest request =
+                () -> cloudClient.read(bucket, fHandle.getFileReference().getRelativePath(), offset, data);
+        ICloudBeforeRetryRequest retry = () -> data.position(position);
+        CloudRetryableRequestUtil.run(request, retry);
     }
 
     @Override
-    public final InputStream cloudRead(IFileHandle fHandle, long offset, long length) {
-        return cloudClient.getObjectStream(bucket, fHandle.getFileReference().getRelativePath(), offset, length);
+    public final CloudInputStream cloudRead(IFileHandle fHandle, long offset, long length) throws HyracksDataException {
+        return CloudRetryableRequestUtil.run(() -> new CloudInputStream(this, fHandle,
+                cloudClient.getObjectStream(bucket, fHandle.getFileReference().getRelativePath(), offset, length),
+                offset, length));
+    }
+
+    @Override
+    public void restoreStream(CloudInputStream cloudStream) {
+        LOGGER.warn("Restoring stream from cloud, {}", cloudStream);
+        /*
+         * This cloud request should not be called using CloudRetryableRequestUtil as it is the responsibility of the
+         * caller to warp this request as ICloudRequest or ICloudRetry.
+         */
+        InputStream stream = cloudClient.getObjectStream(bucket, cloudStream.getPath(), cloudStream.getOffset(),
+                cloudStream.getRemaining());
+        cloudStream.setInputStream(stream);
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 9e35020..cace898 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -25,6 +25,9 @@
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -141,7 +144,9 @@
                  */
                 writeBuffer.flip();
                 try {
-                    bufferedWriter.uploadLast(this, writeBuffer);
+                    ICloudRequest request = () -> bufferedWriter.uploadLast(this, writeBuffer);
+                    ICloudBeforeRetryRequest retry = () -> writeBuffer.position(0);
+                    CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request, retry);
                 } catch (Exception e) {
                     LOGGER.error(e);
                     throw HyracksDataException.create(e);
@@ -182,7 +187,10 @@
     private void uploadAndWait() throws HyracksDataException {
         writeBuffer.flip();
         try {
-            bufferedWriter.upload(this, writeBuffer.limit());
+            ICloudRequest request = () -> bufferedWriter.upload(this, writeBuffer.limit());
+            ICloudBeforeRetryRequest retry = () -> writeBuffer.position(0);
+            // This will be interrupted and the interruption will be followed by a halt
+            CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request, retry);
         } catch (Exception e) {
             LOGGER.error(e);
             throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 49b919c..f8cf2c3 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -53,6 +53,7 @@
 import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -174,7 +175,7 @@
 
     @Override
     public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
-        return accessor.doList(dir, filter);
+        return CloudRetryableRequestUtil.run(() -> accessor.doList(dir, filter));
     }
 
     @Override
@@ -189,18 +190,18 @@
 
     @Override
     public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
-        return accessor.doReadAllBytes(fileRef);
+        return CloudRetryableRequestUtil.run(() -> accessor.doReadAllBytes(fileRef));
     }
 
     @Override
     public void delete(FileReference fileRef) throws HyracksDataException {
-        accessor.doDelete(fileRef);
+        CloudRetryableRequestUtil.run(() -> CloudRetryableRequestUtil.run(() -> accessor.doDelete(fileRef)));
         log("DELETE", fileRef);
     }
 
     @Override
     public void overwrite(FileReference fileRef, byte[] bytes) throws HyracksDataException {
-        accessor.doOverwrite(fileRef, bytes);
+        CloudRetryableRequestUtil.run(() -> accessor.doOverwrite(fileRef, bytes));
         log("WRITE", fileRef);
     }
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
index 8d28d3a..fcb7bda 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
 import org.apache.logging.log4j.LogManager;
@@ -57,7 +58,7 @@
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Bulk deleting: local: {}, cloud: {}", fileReferences, paths);
         }
-        cloudClient.deleteObjects(bucket, paths);
+        CloudRetryableRequestUtil.run(() -> cloudClient.deleteObjects(bucket, paths));
         // Bulk delete locally as well
         super.performOperation();
         callBack.call(fileReferences);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index b0a1e0c..ee43a2c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -24,8 +24,10 @@
 import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
 
 public class CloudClientProvider {
+    private static final boolean UNSTABLE = isUnstable();
     public static final String S3 = "s3";
     public static final String GCS = "gs";
 
@@ -36,13 +38,21 @@
     public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian)
             throws HyracksDataException {
         String storageScheme = cloudProperties.getStorageScheme();
+        ICloudClient cloudClient;
         if (S3.equalsIgnoreCase(storageScheme)) {
             S3ClientConfig config = S3ClientConfig.of(cloudProperties);
-            return new S3CloudClient(config, guardian);
+            cloudClient = new S3CloudClient(config, guardian);
         } else if (GCS.equalsIgnoreCase(storageScheme)) {
             GCSClientConfig config = GCSClientConfig.of(cloudProperties);
-            return new GCSCloudClient(config, guardian);
+            cloudClient = new GCSCloudClient(config, guardian);
+        } else {
+            throw new IllegalStateException("unsupported cloud storage scheme: " + storageScheme);
         }
-        throw new IllegalStateException("unsupported cloud storage scheme: " + storageScheme);
+
+        return UNSTABLE ? new UnstableCloudClient(cloudClient) : cloudClient;
+    }
+
+    private static boolean isUnstable() {
+        return Boolean.getBoolean(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE);
     }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
new file mode 100644
index 0000000..2ec5fb5
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class UnstableCloudClient implements ICloudClient {
+    // 10% error rate
+    private static final double ERROR_RATE = 0.1d;
+    private static final Random RANDOM = new Random(0);
+    private final ICloudClient cloudClient;
+
+    public UnstableCloudClient(ICloudClient cloudClient) {
+        this.cloudClient = cloudClient;
+    }
+
+    @Override
+    public int getWriteBufferSize() {
+        return cloudClient.getWriteBufferSize();
+    }
+
+    @Override
+    public IRequestProfiler getProfiler() {
+        return cloudClient.getProfiler();
+    }
+
+    @Override
+    public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
+        if (cloudClient instanceof S3CloudClient) {
+            return createUnstableWriter((S3CloudClient) cloudClient, bucket, path, bufferProvider);
+        }
+        return cloudClient.createWriter(bucket, path, bufferProvider);
+    }
+
+    @Override
+    public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
+        return cloudClient.listObjects(bucket, path, filter);
+    }
+
+    @Override
+    public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+        fail();
+        return cloudClient.read(bucket, path, offset, buffer);
+    }
+
+    @Override
+    public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+        fail();
+        return cloudClient.readAllBytes(bucket, path);
+    }
+
+    @Override
+    public InputStream getObjectStream(String bucket, String path, long offset, long length) {
+        return cloudClient.getObjectStream(bucket, path, offset, length);
+    }
+
+    @Override
+    public void write(String bucket, String path, byte[] data) {
+        cloudClient.write(bucket, path, data);
+    }
+
+    @Override
+    public void copy(String bucket, String srcPath, FileReference destPath) {
+        cloudClient.copy(bucket, srcPath, destPath);
+    }
+
+    @Override
+    public void deleteObjects(String bucket, Collection<String> paths) {
+        cloudClient.deleteObjects(bucket, paths);
+    }
+
+    @Override
+    public long getObjectSize(String bucket, String path) throws HyracksDataException {
+        fail();
+        return cloudClient.getObjectSize(bucket, path);
+    }
+
+    @Override
+    public boolean exists(String bucket, String path) throws HyracksDataException {
+        fail();
+        return cloudClient.exists(bucket, path);
+    }
+
+    @Override
+    public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
+        fail();
+        return cloudClient.isEmptyPrefix(bucket, path);
+    }
+
+    @Override
+    public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager)
+            throws HyracksDataException {
+        return cloudClient.createParallelDownloader(bucket, ioManager);
+    }
+
+    @Override
+    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+        return cloudClient.listAsJson(objectMapper, bucket);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        cloudClient.close();
+    }
+
+    private static void fail() throws HyracksDataException {
+        double prob = RANDOM.nextInt(100) / 100.0d;
+        if (prob <= ERROR_RATE) {
+            throw HyracksDataException.create(new IOException("Simulated error"));
+        }
+    }
+
+    private static ICloudWriter createUnstableWriter(S3CloudClient cloudClient, String bucket, String path,
+            IWriteBufferProvider bufferProvider) {
+        ICloudBufferedWriter bufferedWriter =
+                new UnstableCloudBufferedWriter(cloudClient.createBufferedWriter(bucket, path));
+        return new CloudResettableInputStream(bufferedWriter, bufferProvider);
+    }
+
+    private static class UnstableCloudBufferedWriter implements ICloudBufferedWriter {
+        private final ICloudBufferedWriter bufferedWriter;
+
+        private UnstableCloudBufferedWriter(ICloudBufferedWriter bufferedWriter) {
+            this.bufferedWriter = bufferedWriter;
+        }
+
+        @Override
+        public void upload(InputStream stream, int length) throws HyracksDataException {
+            fail();
+            bufferedWriter.upload(stream, length);
+        }
+
+        @Override
+        public void uploadLast(InputStream stream, ByteBuffer buffer) throws HyracksDataException {
+            fail();
+            bufferedWriter.uploadLast(stream, buffer);
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return bufferedWriter.isEmpty();
+        }
+
+        @Override
+        public void finish() throws HyracksDataException {
+            bufferedWriter.finish();
+        }
+
+        @Override
+        public void abort() throws HyracksDataException {
+            bufferedWriter.abort();
+        }
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index d0dda2a..05a9fc1 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -22,12 +22,12 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -84,7 +84,6 @@
         if (uploadId == null) {
             profiler.objectWrite();
             PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
-            // TODO make retryable
             s3Client.putObject(request, RequestBody.fromByteBuffer(buffer));
             // Only set the uploadId if the putObject succeeds
             uploadId = PUT_UPLOAD_ID;
@@ -111,28 +110,9 @@
         CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder().parts(partQueue).build();
         CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder()
                 .bucket(bucket).key(path).uploadId(uploadId).multipartUpload(completedMultipartUpload).build();
-        int retries = 0;
-        while (true) {
-            try {
-                completeMultipartUpload(completeMultipartUploadRequest);
-                break;
-            } catch (Exception e) {
-                retries++;
-                if (retries == MAX_RETRIES) {
-                    throw HyracksDataException.create(e);
-                }
-                LOGGER.info(() -> "S3 storage write retry, encountered: " + e.getMessage());
-
-                // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
-                try {
-                    Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 2 ? 1 : 2));
-                } catch (InterruptedException ex) {
-                    Thread.currentThread().interrupt();
-                    throw HyracksDataException.create(ex);
-                }
-            }
-        }
-
+        // This will be interrupted and the interruption will be followed by a halt
+        CloudRetryableRequestUtil
+                .runWithNoRetryOnInterruption(() -> completeMultipartUpload(completeMultipartUploadRequest));
         log("FINISHED");
     }
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 5ce1f43..254bd03 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -288,6 +288,13 @@
         s3Client.close();
     }
 
+    /**
+     * FOR TESTING ONLY
+     */
+    public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+        return new S3BufferedWriter(s3Client, profiler, guardian, bucket, path);
+    }
+
     private static S3Client buildClient(S3ClientConfig config) {
         S3ClientBuilder builder = S3Client.builder();
         builder.credentialsProvider(config.createCredentialsProvider());
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 0dca417..ab57139 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -18,11 +18,14 @@
  */
 package org.apache.hyracks.cloud.io;
 
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
 
 /**
  * Certain operations needed to be provided by {@link org.apache.hyracks.api.io.IIOManager} to support cloud
@@ -45,7 +48,17 @@
      * @param offset  starting offset
      * @return input stream of the required data
      */
-    InputStream cloudRead(IFileHandle fHandle, long offset, long length);
+    CloudInputStream cloudRead(IFileHandle fHandle, long offset, long length) throws HyracksDataException;
+
+    /**
+     * Tries to restore the stream created by {@link #cloudRead(IFileHandle, long, long)}
+     * NOTE: The implementer of this method should not use {@link CloudRetryableRequestUtil} when calling this method.
+     * It is the responsibility of the caller to either call this method as a
+     * {@link ICloudRequest} or as a {@link ICloudBeforeRetryRequest}.
+     *
+     * @param stream to restore
+     */
+    void restoreStream(CloudInputStream stream);
 
     /**
      * Write to local drive only
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
new file mode 100644
index 0000000..4dab80d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+
+/**
+ * Certain cloud requests require some cleanup (or restoring a state) before a retry is performed.
+ * An implementation of This interface should be provided if such clean is required when
+ * reattempting a request using {@link CloudRetryableRequestUtil}
+ */
+@FunctionalInterface
+public interface ICloudBeforeRetryRequest {
+    /**
+     * Run pre-retry routine before reattempting {@link ICloudRequest} or {@link ICloudReturnableRequest}
+     */
+    void beforeRetry();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
new file mode 100644
index 0000000..4f25323
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.io.IOException;
+
+/**
+ * A cloud request that can be retried using {@link org.apache.hyracks.cloud.util.CloudRetryableRequestUtil}
+ */
+@FunctionalInterface
+public interface ICloudRequest {
+    /**
+     * Run the cloud request
+     */
+    void call() throws IOException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
new file mode 100644
index 0000000..748165a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.io.IOException;
+
+/**
+ * A returnable cloud request that can be retried using {@link org.apache.hyracks.cloud.util.CloudRetryableRequestUtil}
+ */
+@FunctionalInterface
+public interface ICloudReturnableRequest<T> {
+    /**
+     * Run the cloud request
+     *
+     * @return the value of the request
+     */
+    T call() throws IOException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
new file mode 100644
index 0000000..df12003
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class CloudInputStream {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ICloudIOManager cloudIOManager;
+    private final IFileHandle handle;
+    private InputStream in;
+    private long offset;
+    private long remaining;
+
+    public CloudInputStream(ICloudIOManager cloudIOManager, IFileHandle handle, InputStream in, long offset,
+            long length) {
+        this.cloudIOManager = cloudIOManager;
+        this.handle = handle;
+        this.in = in;
+        this.offset = offset;
+        this.remaining = length;
+    }
+
+    public String getPath() {
+        return handle.getFileReference().getRelativePath();
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    public long getRemaining() {
+        return remaining;
+    }
+
+    public void read(ByteBuffer buffer) throws HyracksDataException {
+        int position = buffer.position();
+        ICloudRequest read = () -> {
+            while (buffer.remaining() > 0) {
+                int length = in.read(buffer.array(), buffer.position(), buffer.remaining());
+                if (length < 0) {
+                    throw new IllegalStateException("Stream should not be empty!");
+                }
+                buffer.position(buffer.position() + length);
+            }
+        };
+
+        ICloudBeforeRetryRequest retry = () -> {
+            buffer.position(position);
+            cloudIOManager.restoreStream(this);
+        };
+
+        CloudRetryableRequestUtil.run(read, retry);
+
+        offset += buffer.limit();
+        remaining -= buffer.limit();
+    }
+
+    public void skipTo(long newOffset) throws HyracksDataException {
+        if (newOffset > offset) {
+            skip(newOffset - offset);
+        }
+    }
+
+    public void close() {
+        if (remaining != 0) {
+            LOGGER.warn("Closed cloud stream with nonzero bytes = {}", remaining);
+        }
+
+        try {
+            in.close();
+        } catch (IOException e) {
+            LOGGER.error("Failed to close stream", e);
+        }
+    }
+
+    public void setInputStream(InputStream in) {
+        this.in = in;
+    }
+
+    private void skip(long n) throws HyracksDataException {
+        /*
+         * Advance offset and reduce the remaining so that the streamRestore will start from where we want to skip
+         * in case the stream has to be restored.
+         */
+        offset += n;
+        remaining -= n;
+
+        try {
+            long remaining = n;
+            while (remaining > 0) {
+                remaining -= in.skip(remaining);
+            }
+        } catch (Throwable e) {
+            if (remaining > 0) {
+                // Only restore the stream if additional bytes are required
+                CloudRetryableRequestUtil.run(() -> cloudIOManager.restoreStream(this));
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "{file: " + handle.getFileReference() + ", streamOffset: " + offset + ", streamRemaining: " + remaining
+                + "}";
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
new file mode 100644
index 0000000..2d0afbe
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.util;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Run {@link ICloudRequest} and {@link ICloudReturnableRequest} with retries
+ */
+public class CloudRetryableRequestUtil {
+    /**
+     * Whether simulating/testing unstable cloud environment or not. This value affects the number of retries.
+     * Set this as a system property to 'true' to indicate running an unstable cloud environment.
+     *
+     * @see System#setProperty(String, String)
+     */
+    public static final String CLOUD_UNSTABLE_MODE = "cloud.unstable.mode";
+    private static final int STABLE_NUMBER_OF_RETRIES = 5;
+    private static final int UNSTABLE_NUMBER_OF_RETRIES = 100;
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int NUMBER_OF_RETRIES = getNumberOfRetries();
+
+    private static final ICloudBeforeRetryRequest NO_OP_RETRY = () -> {
+    };
+
+    private CloudRetryableRequestUtil() {
+    }
+
+    /**
+     * Run an idempotent request and will retry if failed or interrupted
+     *
+     * @param request request to run
+     */
+    public static void run(ICloudRequest request) throws HyracksDataException {
+        run(request, NO_OP_RETRY);
+    }
+
+    /**
+     * Run a none-idempotent request and will retry if failed or interrupted.
+     * As the operation is not idempotent, {@link ICloudBeforeRetryRequest} ensures the idempotency of the provided operation
+     *
+     * @param request request to run
+     * @param retry   a pre-retry routine to make the operation idempotent
+     */
+    public static void run(ICloudRequest request, ICloudBeforeRetryRequest retry) throws HyracksDataException {
+        boolean interrupted = Thread.interrupted();
+        try {
+            while (true) {
+                try {
+                    doRun(request, retry);
+                    break;
+                } catch (Throwable e) {
+                    // First, clear the interrupted flag
+                    interrupted |= Thread.interrupted();
+                    if (!ExceptionUtils.causedByInterrupt(e)) {
+                        // The cause isn't an interruption, rethrow
+                        throw e;
+                    }
+                    retry.beforeRetry();
+                    LOGGER.warn("Ignored interrupting ICloudReturnableRequest", e);
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Run an idempotent returnable request and will retry if failed or interrupted.
+     *
+     * @param request request to run
+     * @param <T>     return type
+     * @return a value of return type
+     */
+    public static <T> T run(ICloudReturnableRequest<T> request) throws HyracksDataException {
+        return run(request, NO_OP_RETRY);
+    }
+
+    /**
+     * Run a none-idempotent returnable request and will retry if failed or interrupted.
+     * As the operation is not idempotent, {@link ICloudBeforeRetryRequest} ensures the idempotency of the provided operation
+     *
+     * @param request request to run
+     * @param <T>     return type
+     * @param retry   a pre-retry routine to make the operation idempotent
+     * @return a value of return type
+     */
+    public static <T> T run(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry)
+            throws HyracksDataException {
+        boolean interrupted = Thread.interrupted();
+        try {
+            while (true) {
+                try {
+                    return doRun(request, retry);
+                } catch (Throwable e) {
+                    // First, clear the interrupted flag
+                    interrupted |= Thread.interrupted();
+                    if (!ExceptionUtils.causedByInterrupt(e)) {
+                        // The cause isn't an interruption, rethrow
+                        throw e;
+                    }
+                    retry.beforeRetry();
+                    LOGGER.warn("Ignored interrupting ICloudReturnableRequest", e);
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Run an idempotent request and will retry if failed.
+     * This will not retry if the thread is interrupted
+     *
+     * @param request request to run
+     */
+    public static void runWithNoRetryOnInterruption(ICloudRequest request) throws HyracksDataException {
+        doRun(request, NO_OP_RETRY);
+    }
+
+    /**
+     * Run a none-idempotent request and will retry if failed
+     * This will not retry if the thread is interrupted.
+     * As the operation is not idempotent, {@link ICloudBeforeRetryRequest} ensures the idempotency of the provided operation
+     *
+     * @param request request to run
+     * @param retry   a pre-retry routine to make the operation idempotent
+     */
+    public static void runWithNoRetryOnInterruption(ICloudRequest request, ICloudBeforeRetryRequest retry)
+            throws HyracksDataException {
+        doRun(request, retry);
+    }
+
+    private static <T> T doRun(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry)
+            throws HyracksDataException {
+        int attempt = 1;
+        while (true) {
+            try {
+                return request.call();
+            } catch (IOException e) {
+                if (attempt > NUMBER_OF_RETRIES) {
+                    throw HyracksDataException.create(e);
+                }
+                attempt++;
+                retry.beforeRetry();
+                LOGGER.warn("Failed to perform ICloudReturnableRequest, performing {}/{}", attempt, NUMBER_OF_RETRIES,
+                        e);
+            }
+        }
+    }
+
+    private static void doRun(ICloudRequest request, ICloudBeforeRetryRequest retry) throws HyracksDataException {
+        int attempt = 1;
+        while (true) {
+            try {
+                request.call();
+                break;
+            } catch (IOException e) {
+                if (attempt > NUMBER_OF_RETRIES) {
+                    throw HyracksDataException.create(e);
+                }
+                attempt++;
+                retry.beforeRetry();
+                LOGGER.warn("Failed to perform ICloudRequest, performing {}/{}", attempt, NUMBER_OF_RETRIES, e);
+            }
+        }
+    }
+
+    private static int getNumberOfRetries() {
+        boolean unstable = Boolean.getBoolean(CLOUD_UNSTABLE_MODE);
+        return unstable ? UNSTABLE_NUMBER_OF_RETRIES : STABLE_NUMBER_OF_RETRIES;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/AbstractPageRangesComputer.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/AbstractPageRangesComputer.java
new file mode 100644
index 0000000..76b9650
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/AbstractPageRangesComputer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+
+public abstract class AbstractPageRangesComputer {
+    protected static final int INITIAL_SIZE = 40;
+    // Indicates a page is requested or not
+    protected final BitSet requestedPages;
+
+    AbstractPageRangesComputer() {
+        requestedPages = new BitSet();
+    }
+
+    abstract int getMaxNumberOfRanges();
+
+    /**
+     * Clear ranges
+     */
+    abstract void clear();
+
+    /**
+     * Add a range
+     *
+     * @param start range start
+     * @param end   range end
+     */
+    abstract void addRange(int start, int end);
+
+    /**
+     * Pin the calculated ranges
+     *
+     * @param ctx         Column mega-page buffer cache read context
+     * @param bufferCache buffer cache
+     * @param fileId      fileId
+     * @param pageZeroId  page zero ID
+     */
+    abstract void pin(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+            throws HyracksDataException;
+
+    /**
+     * Creates a new range computer
+     *
+     * @param maxNumberOfRanges maximum number of ranges
+     * @return a new instance of {@link AbstractPageRangesComputer}
+     */
+    static AbstractPageRangesComputer create(int maxNumberOfRanges) {
+        if (maxNumberOfRanges == 1) {
+            return new SinglePageRangeComputer();
+        }
+
+        return new PageRangesComputer(maxNumberOfRanges);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index fef150a..1ecc509 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -22,6 +22,7 @@
 import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
 import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
 import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+import static org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudMegaPageReadContext.ALL_PAGES;
 
 import java.nio.ByteBuffer;
 import java.util.BitSet;
@@ -55,7 +56,7 @@
     private final ColumnRanges columnRanges;
     private final CloudMegaPageReadContext columnCtx;
     private final BitSet projectedColumns;
-    private final MergedPageRanges mergedPageRanges;
+    private final AbstractPageRangesComputer mergedPageRanges;
 
     public CloudColumnReadContext(IColumnProjectionInfo projectionInfo, IPhysicalDrive drive, BitSet plan) {
         this.operation = projectionInfo.getProjectorType();
@@ -65,7 +66,7 @@
         cloudOnlyColumns = new BitSet();
         columnCtx = new CloudMegaPageReadContext(operation, columnRanges, drive);
         projectedColumns = new BitSet();
-        mergedPageRanges = new MergedPageRanges(columnCtx, MAX_RANGES_COUNT);
+        mergedPageRanges = AbstractPageRangesComputer.create(MAX_RANGES_COUNT);
         if (operation == QUERY || operation == MODIFY) {
             for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) {
                 int columnIndex = projectionInfo.getColumnIndex(i);
@@ -113,10 +114,10 @@
     public ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
             throws HyracksDataException {
         int nextLeaf = leafFrame.getNextLeaf();
-        // Release the previous pages (including page0)
+        // Release the previous pages
         release(bufferCache);
+        // Release page0
         bufferCache.unpin(leafFrame.getPage(), this);
-
         // pin the next page0
         ICachedPage nextPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, nextLeaf), this);
         leafFrame.setPage(nextPage);
@@ -142,11 +143,11 @@
 
     private void pinAll(int fileId, int pageZeroId, int numberOfPages, IBufferCache bufferCache)
             throws HyracksDataException {
-        columnCtx.pin(bufferCache, fileId, pageZeroId, 1, numberOfPages, numberOfPages, MergedPageRanges.EMPTY);
+        columnCtx.pin(bufferCache, fileId, pageZeroId, 1, numberOfPages, ALL_PAGES);
     }
 
     private void pinProjected(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
-        mergedPageRanges.reset();
+        mergedPageRanges.clear();
         int[] columnsOrder = columnRanges.getColumnsOrder();
         int i = 0;
         int columnIndex = columnsOrder[i];
@@ -186,8 +187,9 @@
 
             mergedPageRanges.addRange(firstPageIdx, lastPageIdx);
         }
+
         // pin the calculated pageRanges
-        mergedPageRanges.pin(fileId, pageZeroId, bufferCache);
+        mergedPageRanges.pin(columnCtx, bufferCache, fileId, pageZeroId);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index ed6e83f..29c9467 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -22,7 +22,6 @@
 import static org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -32,6 +31,7 @@
 import org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
 import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
@@ -47,22 +47,20 @@
 import org.apache.logging.log4j.Logger;
 
 @NotThreadSafe
-final class CloudMegaPageReadContext implements IBufferCacheReadContext {
+public final class CloudMegaPageReadContext implements IBufferCacheReadContext {
     private static final Logger LOGGER = LogManager.getLogger();
+    static final BitSet ALL_PAGES = new BitSet();
     private final ColumnProjectorType operation;
     private final ColumnRanges columnRanges;
     private final IPhysicalDrive drive;
     private final List<ICachedPage> pinnedPages;
 
     private int numberOfContiguousPages;
-    // For logging, to get actual number of wanted pages
-    private int numberOfWantedPages;
     private int pageCounter;
-    private InputStream gapStream;
+    private CloudInputStream gapStream;
 
-    // For debugging
-    private long streamOffset;
-    private long remainingStreamBytes;
+    // for debugging
+    int pageZeroId;
 
     CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges columnRanges, IPhysicalDrive drive) {
         this.operation = operation;
@@ -71,13 +69,13 @@
         pinnedPages = new ArrayList<>();
     }
 
-    void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages,
-            int numberOfWantedPages, BitSet unwantedPages) throws HyracksDataException {
+    void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages, BitSet requestedPages)
+            throws HyracksDataException {
         closeStream();
         this.numberOfContiguousPages = numberOfPages;
-        this.numberOfWantedPages = numberOfWantedPages;
         pageCounter = 0;
-        doPin(bufferCache, fileId, pageZeroId, start, numberOfPages, numberOfWantedPages, unwantedPages);
+        this.pageZeroId = pageZeroId;
+        doPin(bufferCache, fileId, pageZeroId, start, numberOfPages, requestedPages);
     }
 
     @Override
@@ -86,10 +84,10 @@
         if (cachedPage.skipCloudStream()) {
             /*
              * This page is requested but the buffer cache has a valid copy in memory. Also, the page itself was
-             * requested to be read from the cloud. Since this page is valid, no buffer cache read() will be performed.
-             * As the buffer cache read() is also responsible for persisting the bytes read from the cloud, we can end
-             * up writing the bytes of this page in the position of another page. Therefore, we should skip the bytes
-             * for this particular page to avoid placing the bytes of this page into another page's position.
+             * gapStream requested to be read from the cloud. Since this page is valid, no buffer cache read() will be
+             * performed. As the buffer cache read() is also responsible for persisting the bytes read from the cloud,
+             * we can end up writing the bytes of this page in the position of another page. Therefore, we should skip
+             * the bytes for this particular page to avoid placing the bytes of this page into another page's position.
              */
             skipStreamIfOpened(cachedPage);
         }
@@ -152,49 +150,28 @@
         pinnedPages.clear();
     }
 
-    void closeStream() throws HyracksDataException {
+    void closeStream() {
         if (gapStream != null) {
-            if (remainingStreamBytes != 0) {
-                LOGGER.warn("Closed cloud stream with nonzero bytes = {}", remainingStreamBytes);
-            }
-
-            try {
-                gapStream.close();
-                gapStream = null;
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
-            }
+            gapStream.close();
+            gapStream = null;
         }
     }
 
     private void readFromStream(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
             CachedPage cPage, boolean persist) throws HyracksDataException {
-        InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
+        CloudInputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
         ByteBuffer buffer = header.getBuffer();
         buffer.position(0);
 
-        // If the stream consists of the unwanted pages,
-        // if the currentPage's offset is greater, this means
-        // the streamOffset is pointing to a previous page.
+        /*
+         * The 'gapStream' could point to an offset of an unwanted page due to range merging. For example, if
+         * 'gapStream' is currently at the offset of pageId = 5 and the cPage is for pageId = 7, then, the stream
+         * must be advanced to the cPage's offset (i.e., offset of pageId = 7) -- skipping pages 5 and 6.
+         */
+        gapStream.skipTo(cPage.getCompressedPageOffset());
 
-        // hence we should skip those many bytes.
-        // eg: if pageId(cPage) = 7 and streamOffset is pointing at 5
-        // then we need to jump 5,6 page worth of compressed size.
-        if (cPage.getCompressedPageOffset() > streamOffset) {
-            skipBytes(cPage.getCompressedPageOffset() - streamOffset);
-        }
-
-        try {
-            while (buffer.remaining() > 0) {
-                int length = stream.read(buffer.array(), buffer.position(), buffer.remaining());
-                if (length < 0) {
-                    throw new IllegalStateException("Stream should not be empty!");
-                }
-                buffer.position(buffer.position() + length);
-            }
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
-        }
+        // Get the page's data from the cloud
+        doStreamRead(stream, buffer);
 
         // Flip the buffer after reading to restore the correct position
         buffer.flip();
@@ -204,12 +181,9 @@
             ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
             BufferCacheCloudReadContextUtil.persist(cloudIOManager, fileHandle.getFileHandle(), buffer, offset);
         }
-
-        streamOffset += cPage.getCompressedPageSize();
-        remainingStreamBytes -= cPage.getCompressedPageSize();
     }
 
-    private InputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage)
+    private CloudInputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage)
             throws HyracksDataException {
         if (gapStream != null) {
             return gapStream;
@@ -219,34 +193,24 @@
         long offset = cPage.getCompressedPageOffset();
         int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
         long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
-        remainingStreamBytes = length;
-        streamOffset = offset;
-        LOGGER.info(
-                "Cloud stream read for pageId={} starting from pageCounter={} out of "
-                        + "numberOfContiguousPages={} with numberOfWantedPages={}"
-                        + " (streamOffset = {}, remainingStreamBytes = {})",
-                pageId, pageCounter, numberOfContiguousPages, numberOfWantedPages, streamOffset, remainingStreamBytes);
-
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
         gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), offset, length);
 
+        LOGGER.info(
+                "Cloud stream read for pageId={} starting from pageCounter={} out of "
+                        + "numberOfContiguousPages={}. pageZeroId={} stream: {}",
+                pageId, pageCounter, numberOfContiguousPages, pageZeroId, gapStream);
+
         return gapStream;
     }
 
-    private void skipBytes(long length) throws HyracksDataException {
-        if (gapStream == null) {
-            return;
-        }
-
+    private void doStreamRead(CloudInputStream stream, ByteBuffer buffer) throws HyracksDataException {
+        int length = buffer.remaining();
         try {
-            long lengthToSkip = length;
-            while (length > 0) {
-                length -= gapStream.skip(length);
-            }
-            streamOffset += lengthToSkip;
-            remainingStreamBytes -= lengthToSkip;
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
+            stream.read(buffer);
+        } catch (Throwable th) {
+            LOGGER.warn("Failed to READ {} bytes from stream {}", length, gapStream);
+            throw HyracksDataException.create(th);
         }
     }
 
@@ -255,34 +219,29 @@
             return;
         }
 
+        // Ensure the stream starts from the page's offset and also skip the page's content
+        long newOffset = cPage.getCompressedPageOffset() + cPage.getCompressedPageSize();
         try {
-            long remaining = cPage.getCompressedPageSize();
-            while (remaining > 0) {
-                remaining -= gapStream.skip(remaining);
-            }
-            streamOffset += cPage.getCompressedPageSize();
-            remainingStreamBytes -= cPage.getCompressedPageSize();
+            gapStream.skipTo(newOffset);
         } catch (IOException e) {
+            LOGGER.warn("Failed to SKIP to new offset {} from stream {}", newOffset, gapStream);
             throw HyracksDataException.create(e);
         }
     }
 
     private void doPin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages,
-            int numberOfWantedPages, BitSet unwantedPages) throws HyracksDataException {
+            BitSet requestedPages) throws HyracksDataException {
         for (int i = start; i < start + numberOfPages; i++) {
-            int pageId = pageZeroId + i;
-            long dpid = BufferedFileHandle.getDiskPageId(fileId, pageId);
             try {
-                if (!unwantedPages.get(pageId)) {
+                if (requestedPages == ALL_PAGES || requestedPages.get(i)) {
+                    int pageId = pageZeroId + i;
+                    long dpid = BufferedFileHandle.getDiskPageId(fileId, pageId);
                     pinnedPages.add(bufferCache.pin(dpid, this));
                 }
                 pageCounter++;
             } catch (Throwable e) {
-                LOGGER.error(
-                        "Error while pinning page number {} with number of pages streamed {}, "
-                                + "with actually wanted number of pages {}"
-                                + "(streamOffset:{}, remainingStreamBytes: {}) columnRanges:\n {}",
-                        i, numberOfPages, numberOfWantedPages, streamOffset, remainingStreamBytes, columnRanges);
+                LOGGER.error("Error while pinning page number {} with number of pages {}. "
+                        + "stream: {}, columnRanges:\n {}", i, numberOfPages, gapStream, columnRanges);
                 throw e;
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
deleted file mode 100644
index c0c2fc9..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
-
-import java.util.BitSet;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
-import org.apache.hyracks.storage.common.buffercache.IBufferCache;
-
-import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.ints.IntList;
-import it.unimi.dsi.fastutil.longs.LongArrayPriorityQueue;
-import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
-
-/**
- * Merge the given ranges such that the maximum number of ranges <= N.
- * Merge should be greedy as the range having lower gaps should be given priority.
- */
-public class MergedPageRanges {
-    public static final BitSet EMPTY = new BitSet();
-    private final CloudMegaPageReadContext columnCtx;
-    private final int numRequiredRanges;
-    private final IntList pageRanges;
-    private final LongPriorityQueue gapRanges;
-    // indicates the index of the ranges which are merged
-    private final BitSet mergedIndex = new BitSet();
-    // indicates a page is requested or not
-    private final BitSet unwantedPages = new BitSet();
-    // indicates the extra pages got included while a merge
-    private int currentIndex = 0;
-    private int numRanges;
-
-    MergedPageRanges(CloudMegaPageReadContext columnCtx, int numRequiredRanges) {
-        this.numRequiredRanges = numRequiredRanges;
-        this.pageRanges = new IntArrayList(40);
-        this.gapRanges = new LongArrayPriorityQueue(IntPairUtil.FIRST_COMPARATOR);
-        this.columnCtx = columnCtx;
-        this.numRanges = 0;
-    }
-
-    public void reset() {
-        mergedIndex.clear();
-        pageRanges.clear();
-        gapRanges.clear();
-        numRanges = 0;
-        currentIndex = 0;
-    }
-
-    public void addRange(int rangeStart, int rangeEnd) {
-        pageRanges.add(rangeStart);
-        pageRanges.add(rangeEnd);
-        numRanges++;
-    }
-
-    public void mergeRanges() {
-        // totalMerges = totalRanges - MAXIMUM_RESULTANT_RANGES
-        int merges = numRanges - numRequiredRanges;
-        for (int i = 2; i < pageRanges.size(); i += 2) {
-            int previousRangeEnd = pageRanges.getInt(i - 1);
-            int currentRangeStart = pageRanges.getInt(i);
-            // this could be optimized to just enqueue "merges" ranges,
-            // but won't be much diff as the number of ranges gonna be small
-            long gap = IntPairUtil.of(currentRangeStart - previousRangeEnd, i / 2);
-            gapRanges.enqueue(gap);
-        }
-
-        int count = 0;
-        while (count < merges) {
-            // extract the lower 32 bits for the index.
-            int index = IntPairUtil.getSecond(gapRanges.dequeueLong());
-            // set the bit from [index - 1, index] indicating
-            // the index and index-1 are merged.
-            mergedIndex.set(index - 1, index + 1);
-            count++;
-        }
-    }
-
-    public void pin(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
-        // since the numRanges are already within set threshold
-        if (numRanges <= numRequiredRanges) {
-            pinWithoutMerge(fileId, pageZeroId, bufferCache);
-            return;
-        }
-        pinWithMerge(fileId, pageZeroId, bufferCache);
-    }
-
-    private void pinWithoutMerge(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
-        for (int pageIndex = 1; pageIndex < pageRanges.size(); pageIndex += 2) {
-            int lastPageIndex = pageRanges.getInt(pageIndex);
-            int firstPageIndex = pageRanges.getInt(pageIndex - 1);
-            int numberOfPages = lastPageIndex - firstPageIndex + 1;
-            columnCtx.pin(bufferCache, fileId, pageZeroId, firstPageIndex, numberOfPages, numberOfPages, EMPTY);
-        }
-    }
-
-    private void pinWithMerge(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
-        // merge the range based on the numRequiredRanges.
-        mergeRanges();
-        // go through page ranges and pin the required ranges.
-        int rangeCnt = 0;
-        while (rangeCnt < numRequiredRanges) {
-            unwantedPages.clear();
-            long mergedRange = getNextRange();
-
-            int firstRangeIdx = IntPairUtil.getFirst(mergedRange);
-            int lastRangeIdx = IntPairUtil.getSecond(mergedRange);
-
-            // since the ranges are flattened out in the pageRanges.
-            // hence ith index's element would be at [2*i, 2*i + 1]
-            int firstRangeStart = pageRanges.getInt(2 * firstRangeIdx);
-            int firstRangeEnd = pageRanges.getInt(2 * firstRangeIdx + 1);
-            int lastRangeStart = pageRanges.getInt(2 * lastRangeIdx);
-            int lastRangeEnd = pageRanges.getInt(2 * lastRangeIdx + 1);
-
-            int numberOfPages = lastRangeEnd - firstRangeStart + 1;
-            // Number of unwanted pages will be zero, when there is just a single range (i.e. no merge)
-            boolean areUnwantedPages = firstRangeIdx != lastRangeIdx;
-            // and when the there is no extra page being fetched. eg: [1 2] [3 4]
-            // for: [ 1 2 ] [ 4 5 ] [ 7 8 ] -> [ 1 8 ] ( fromIndex = 0, toIndex = 2 )
-            // numberOfUnwantedPages = (4 - 2 - 1) + (7 - 5 -1) = 2
-            areUnwantedPages = areUnwantedPages && (lastRangeStart - firstRangeEnd > 1);
-            int numberOfUnwantedPages = 0;
-            if (areUnwantedPages) {
-                // iterate through the index and mark the gaps
-                for (int fromIndex = firstRangeIdx; fromIndex < lastRangeIdx; fromIndex++) {
-                    // Gap = V (2 * (fromIndex+1) ) - V(fromIndex * 2 + 1)
-                    // V(index) = value at the index
-                    int fromRangeEnd = pageRanges.getInt(2 * fromIndex + 1);
-                    int toRangeStart = pageRanges.getInt(2 * (fromIndex + 1));
-                    // fromRangeEnd != toRangeStart, as they would have been merged already
-                    int rangeGap = (fromRangeEnd == toRangeStart) ? 0 : toRangeStart - fromRangeEnd - 1;
-                    if (rangeGap > 0) {
-                        unwantedPages.set(fromRangeEnd + 1, toRangeStart);
-                    }
-                    numberOfUnwantedPages += rangeGap;
-                }
-            }
-
-            columnCtx.pin(bufferCache, fileId, pageZeroId, firstRangeStart, numberOfPages,
-                    numberOfPages - numberOfUnwantedPages, unwantedPages);
-            rangeCnt++;
-        }
-    }
-
-    // making package-private for MergedPageRangesTest
-    long getNextRange() {
-        int fromIndex = currentIndex;
-        int endIndex = currentIndex;
-        int toIndex;
-
-        // move till we have a set index, indicating all the indexes
-        // are merged into one range.
-        while (endIndex < numRanges && mergedIndex.get(endIndex)) {
-            endIndex++;
-        }
-
-        if (fromIndex == endIndex) {
-            currentIndex = endIndex + 1;
-            toIndex = endIndex;
-        } else {
-            currentIndex = endIndex;
-            toIndex = endIndex - 1;
-        }
-
-        return IntPairUtil.of(fromIndex, toIndex);
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/PageRangesComputer.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/PageRangesComputer.java
new file mode 100644
index 0000000..28612c4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/PageRangesComputer.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.Arrays;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.longs.LongArrayPriorityQueue;
+import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
+
+/**
+ * Merge the given ranges such that the maximum number of ranges <= N.
+ * Merge should be greedy as the range having lower gaps should be given priority.
+ */
+final class PageRangesComputer extends AbstractPageRangesComputer {
+    static final int SINGLE_RANGE = 0;
+    static final int EACH_RANGE = 1;
+    static final int BOUNDARIES = 2;
+
+    private final int maxNumberOfRanges;
+    private final LongPriorityQueue gapRanges;
+    final IntList pageRanges;
+    final int[] rangeBoundaries;
+
+    PageRangesComputer(int maxNumberOfRanges) {
+        this.maxNumberOfRanges = maxNumberOfRanges;
+        pageRanges = new IntArrayList(INITIAL_SIZE);
+        gapRanges = new LongArrayPriorityQueue(IntPairUtil.FIRST_COMPARATOR);
+        rangeBoundaries = new int[maxNumberOfRanges];
+    }
+
+    @Override
+    int getMaxNumberOfRanges() {
+        return maxNumberOfRanges;
+    }
+
+    @Override
+    void clear() {
+        pageRanges.clear();
+        gapRanges.clear();
+        requestedPages.clear();
+    }
+
+    @Override
+    void addRange(int rangeStart, int rangeEnd) {
+        int previousEnd = pageRanges.size() - 1;
+
+        pageRanges.add(rangeStart);
+        pageRanges.add(rangeEnd);
+        requestedPages.set(rangeStart, rangeEnd + 1);
+
+        if (previousEnd > 0) {
+            int maxNumberOfCuts = maxNumberOfRanges - 1;
+            int gapSize = rangeStart - pageRanges.getInt(previousEnd) - 1;
+            if (gapRanges.size() < maxNumberOfCuts) {
+                // Didn't reach the maximum number cuts, add this gap
+                gapRanges.enqueue(IntPairUtil.of(gapSize, previousEnd));
+            } else if (IntPairUtil.getFirst(gapRanges.firstLong()) < gapSize) {
+                // Found a bigger gap. Remove the smallest and add this new bigger gap
+                gapRanges.dequeueLong();
+                gapRanges.enqueue(IntPairUtil.of(gapSize, previousEnd));
+            }
+            // This gap is smaller than the smallest gap in the queue, ignore
+            // A smaller gap than the smallest gap. Ignore it
+        }
+    }
+
+    @Override
+    void pin(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+            throws HyracksDataException {
+        int mergeResult = mergeRanges();
+        switch (mergeResult) {
+            case SINGLE_RANGE:
+                pinAsSingleRange(ctx, bufferCache, fileId, pageZeroId);
+                break;
+            case EACH_RANGE:
+                pinEachRange(ctx, bufferCache, fileId, pageZeroId);
+                break;
+            default:
+                pinMergedRanges(ctx, bufferCache, fileId, pageZeroId);
+                break;
+        }
+    }
+
+    int mergeRanges() {
+        int i = 0;
+        int maxGap = 0;
+        while (!gapRanges.isEmpty()) {
+            long pair = gapRanges.dequeueLong();
+            maxGap = Math.max(maxGap, IntPairUtil.getFirst(pair));
+            rangeBoundaries[i] = IntPairUtil.getSecond(pair);
+            i++;
+        }
+
+        if (maxGap == 1) {
+            // The biggest gap is 1, merge the ranges in a single range
+            return SINGLE_RANGE;
+        }
+
+        if (getNumberOfRanges() <= maxNumberOfRanges) {
+            // the number of ranges are within the limit, pin each range separately
+            return EACH_RANGE;
+        }
+
+        // Set the last boundary
+        rangeBoundaries[maxNumberOfRanges - 1] = pageRanges.size() - 1;
+
+        // Sort cuts smallest to largest
+        Arrays.sort(rangeBoundaries);
+
+        // Use the boundaries to cut the ranges into separate ones
+        return BOUNDARIES;
+    }
+
+    private int getNumberOfRanges() {
+        return pageRanges.size() / 2;
+    }
+
+    private void pinAsSingleRange(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+            throws HyracksDataException {
+        int start = pageRanges.getInt(0);
+        int end = pageRanges.getInt(pageRanges.size() - 1);
+        int numberOfPages = end - start + 1;
+        ctx.pin(bufferCache, fileId, pageZeroId, start, numberOfPages, requestedPages);
+    }
+
+    private void pinEachRange(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+            throws HyracksDataException {
+        int numberOfRanges = getNumberOfRanges();
+        for (int i = 0; i < numberOfRanges; i += 2) {
+            int start = pageRanges.getInt(i);
+            int end = pageRanges.getInt(i + 1);
+            int numberOfPages = end - start + 1;
+            ctx.pin(bufferCache, fileId, pageZeroId, start, numberOfPages, requestedPages);
+        }
+    }
+
+    private void pinMergedRanges(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+            throws HyracksDataException {
+        int startIndex = 0;
+        for (int i = 0; i < rangeBoundaries.length; i++) {
+            int endIndex = rangeBoundaries[i];
+            int rangeStart = pageRanges.getInt(startIndex);
+            int rangeEnd = pageRanges.getInt(endIndex);
+            int numberOfPages = rangeEnd - rangeStart + 1;
+            ctx.pin(bufferCache, fileId, pageZeroId, rangeStart, numberOfPages, requestedPages);
+
+            // Start from the next cut
+            startIndex = endIndex + 1;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "{rangeBoundaries: " + Arrays.toString(rangeBoundaries) + ", pageRanges: " + pageRanges
+                + ", requestedPages: " + requestedPages + "}";
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/SinglePageRangeComputer.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/SinglePageRangeComputer.java
new file mode 100644
index 0000000..4c9536b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/SinglePageRangeComputer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+
+/**
+ * Merge the given ranges such that the maximum number of ranges <= N.
+ * Merge should be greedy as the range having lower gaps should be given priority.
+ */
+final class SinglePageRangeComputer extends AbstractPageRangesComputer {
+    int rangeStart;
+    int rangeEnd;
+    private int numberOfRanges;
+
+    @Override
+    int getMaxNumberOfRanges() {
+        return 1;
+    }
+
+    @Override
+    void clear() {
+        numberOfRanges = 0;
+        requestedPages.clear();
+    }
+
+    @Override
+    void addRange(int start, int end) {
+        if (numberOfRanges++ == 0) {
+            rangeStart = start;
+        }
+        rangeEnd = end;
+        requestedPages.set(start, end + 1);
+    }
+
+    @Override
+    void pin(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+            throws HyracksDataException {
+        int numberOfPages = rangeEnd - rangeStart + 1;
+        ctx.pin(bufferCache, fileId, pageZeroId, rangeStart, numberOfPages, requestedPages);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
index 169c14d..c5210d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
@@ -18,40 +18,29 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
 
-import java.util.List;
-
-import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
-import it.unimi.dsi.fastutil.Pair;
+import it.unimi.dsi.fastutil.ints.IntList;
 
 public class MergedPageRagesTest {
-    private MergedPageRanges mergedPageRanges;
-    private final CloudMegaPageReadContext cloudMegaPageReadContext = null;
 
     @Test
     public void mergePageRanges1() {
         int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
 
         int requiredRangeCount = 3;
-        mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext, requiredRangeCount);
+        AbstractPageRangesComputer mergedPageRanges = AbstractPageRangesComputer.create(requiredRangeCount);
         for (int i = 0; i < pageRanges.length; i += 2) {
             mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
         }
 
-        mergedPageRanges.mergeRanges();
         // since the gaps are in following order
         // ( 2, 1, 4, 4 )
         // since we need 3 ranges, 5 - 3 = 2 merges should be done.
         // hence the resultant ranges := ( 0, 2 ), ( 3, 3 ), ( 4, 4 )
-        List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 2), Pair.of(3, 3), Pair.of(4, 4));
-
-        for (int i = 0; i < requiredRangeCount; i++) {
-            long nextRange = mergedPageRanges.getNextRange();
-            Assert.assertEquals(ranges.get(i).first().intValue(), IntPairUtil.getFirst(nextRange));
-            Assert.assertEquals(ranges.get(i).second().intValue(), IntPairUtil.getSecond(nextRange));
-        }
+        int[] expectedRanges = new int[] { 1, 12, 16, 19, 23, 26 };
+        assertResult(mergedPageRanges, expectedRanges);
     }
 
     @Test
@@ -59,23 +48,17 @@
         int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
 
         int requiredRangeCount = 1;
-        mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext, requiredRangeCount);
+        AbstractPageRangesComputer mergedPageRanges = AbstractPageRangesComputer.create(requiredRangeCount);
         for (int i = 0; i < pageRanges.length; i += 2) {
             mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
         }
 
-        mergedPageRanges.mergeRanges();
         // since the gaps are in following order
         // ( 2, 1, 4, 4 )
         // since we need 1 ranges, 5 - 4 = 1 merge should be done.
         // hence the resultant ranges := ( 0, 4)
-        List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 4));
-
-        for (int i = 0; i < requiredRangeCount; i++) {
-            long nextRange = mergedPageRanges.getNextRange();
-            Assert.assertEquals(ranges.get(i).first().intValue(), IntPairUtil.getFirst(nextRange));
-            Assert.assertEquals(ranges.get(i).second().intValue(), IntPairUtil.getSecond(nextRange));
-        }
+        int[] expectedRanges = new int[] { 1, 26 };
+        assertResult(mergedPageRanges, expectedRanges);
     }
 
     @Test
@@ -83,22 +66,92 @@
         int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
 
         int requiredRangeCount = 8;
-        mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext, requiredRangeCount);
+        AbstractPageRangesComputer mergedPageRanges = AbstractPageRangesComputer.create(requiredRangeCount);
         for (int i = 0; i < pageRanges.length; i += 2) {
             mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
         }
 
-        mergedPageRanges.mergeRanges();
         // since the gaps are in following order
         // ( 2, 1, 4, 4 )
         // since we need 8 ranges, no merge should be done.
-        List<Pair<Integer, Integer>> ranges =
-                List.of(Pair.of(0, 0), Pair.of(1, 1), Pair.of(2, 2), Pair.of(3, 3), Pair.of(4, 4));
+        int[] expectedRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
 
-        for (int i = 0; i < ranges.size(); i++) {
-            long nextRange = mergedPageRanges.getNextRange();
-            Assert.assertEquals(ranges.get(i).first().intValue(), IntPairUtil.getFirst(nextRange));
-            Assert.assertEquals(ranges.get(i).second().intValue(), IntPairUtil.getSecond(nextRange));
+        assertResult(mergedPageRanges, expectedRanges);
+    }
+
+    @Test
+    public void singlePageGap() {
+        int[] pageRanges = new int[] { 1, 3, 5, 7, 9, 10 };
+
+        int requiredRangeCount = 3;
+        AbstractPageRangesComputer mergedPageRanges = AbstractPageRangesComputer.create(requiredRangeCount);
+        for (int i = 0; i < pageRanges.length; i += 2) {
+            mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
+        }
+
+        // Since the gaps between the ranges are all 1, the result should be a single range
+        int[] expectedRanges = new int[] { 1, 10 };
+
+        assertResult(mergedPageRanges, expectedRanges);
+    }
+
+    private void assertResult(AbstractPageRangesComputer mergedPageRanges, int[] expectedRanges) {
+        if (mergedPageRanges.getMaxNumberOfRanges() == 1) {
+            assertSinglePageRangeComputer((SinglePageRangeComputer) mergedPageRanges, expectedRanges);
+            return;
+        }
+        PageRangesComputer pageRangesComputer = (PageRangesComputer) mergedPageRanges;
+        int mergeResult = pageRangesComputer.mergeRanges();
+        switch (mergeResult) {
+            case PageRangesComputer.SINGLE_RANGE:
+                assertSingleRange(pageRangesComputer, expectedRanges);
+                break;
+            case PageRangesComputer.EACH_RANGE:
+                assertEachRange(pageRangesComputer, expectedRanges);
+                break;
+            default:
+                assertBoundaries(pageRangesComputer, expectedRanges);
+                break;
+        }
+    }
+
+    private void assertSinglePageRangeComputer(SinglePageRangeComputer mergedPageRanges, int[] expectedRanges) {
+        Assert.assertEquals(expectedRanges.length, 2);
+        Assert.assertEquals(expectedRanges[0], mergedPageRanges.rangeStart);
+        Assert.assertEquals(expectedRanges[1], mergedPageRanges.rangeEnd);
+    }
+
+    private void assertSingleRange(PageRangesComputer mergedPageRanges, int[] expectedRanges) {
+        Assert.assertEquals(expectedRanges.length, 2);
+        IntList pageRanges = mergedPageRanges.pageRanges;
+        Assert.assertEquals(expectedRanges[0], pageRanges.getInt(0));
+        Assert.assertEquals(expectedRanges[1], pageRanges.getInt(pageRanges.size() - 1));
+    }
+
+    private void assertEachRange(PageRangesComputer mergedPageRanges, int[] expectedRanges) {
+        IntList pageRanges = mergedPageRanges.pageRanges;
+        Assert.assertEquals(expectedRanges.length, pageRanges.size());
+        for (int i = 0; i < expectedRanges.length; i++) {
+            Assert.assertEquals(expectedRanges[i], pageRanges.getInt(i));
+        }
+    }
+
+    private void assertBoundaries(PageRangesComputer mergedPageRanges, int[] expectedRanges) {
+        int[] rangeBoundaries = mergedPageRanges.rangeBoundaries;
+        Assert.assertEquals(expectedRanges.length / 2, rangeBoundaries.length);
+        IntList pageRanges = mergedPageRanges.pageRanges;
+        int startIndex = 0;
+        int expectedRange = 0;
+        for (int i = 0; i < rangeBoundaries.length; i++) {
+            int endIndex = rangeBoundaries[i];
+            int rangeStart = pageRanges.getInt(startIndex);
+            int rangeEnd = pageRanges.getInt(endIndex);
+
+            Assert.assertEquals(expectedRanges[expectedRange++], rangeStart);
+            Assert.assertEquals(expectedRanges[expectedRange++], rangeEnd);
+
+            // Start from the next cut
+            startIndex = endIndex + 1;
         }
     }
 }