[NO ISSUE][STO] Column cloud refactoring

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Refactor common code and some cleanups in cloud
column read path.

Change-Id: I2e98a86aa4d7f6d25b85b4ae86bbd3232a6ef0ac
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18382
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index 5c6fe09..eacf4fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -38,8 +38,7 @@
  * Computes columns offsets, lengths, and pages
  */
 public final class ColumnRanges {
-    private static final LongComparator OFFSET_COMPARATOR =
-            (x, y) -> Integer.compare(getOffsetFromPair(x), getOffsetFromPair(y));
+    private static final LongComparator OFFSET_COMPARATOR = IntPairUtil.FIRST_COMPARATOR;
     private final int numberOfPrimaryKeys;
 
     // For eviction
@@ -112,13 +111,14 @@
         // Get the number of columns in a page
         int numberOfColumns = leafFrame.getNumberOfColumns();
         for (int i = 0; i < numberOfColumns; i++) {
-            long offset = leafFrame.getColumnOffset(i);
+            int offset = leafFrame.getColumnOffset(i);
             // Set the first 32-bits to the offset and the second 32-bits to columnIndex
-            offsetColumnIndexPairs[i] = (offset << 32) + i;
+            offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
         }
 
         // Set artificial offset to determine the last column's length
-        offsetColumnIndexPairs[numberOfColumns] = (leafFrame.getMegaLeafNodeLengthInBytes() << 32) + numberOfColumns;
+        int megaLeafLength = leafFrame.getMegaLeafNodeLengthInBytes();
+        offsetColumnIndexPairs[numberOfColumns] = IntPairUtil.of(megaLeafLength, numberOfColumns);
 
         // Sort the pairs by offset (i.e., lowest offset first)
         LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfColumns, OFFSET_COMPARATOR);
@@ -251,11 +251,11 @@
     }
 
     private static int getOffsetFromPair(long pair) {
-        return (int) (pair >> 32);
+        return IntPairUtil.getFirst(pair);
     }
 
     private static int getColumnIndexFromPair(long pair) {
-        return (int) pair;
+        return IntPairUtil.getSecond(pair);
     }
 
     private void setCloudOnlyAndEvictablePages(int columnIndex, BitSet cloudOnlyColumns, BitSet evictableColumns,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/IntPairUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/IntPairUtil.java
new file mode 100644
index 0000000..c1fe0cf
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/IntPairUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import it.unimi.dsi.fastutil.longs.LongComparator;
+
+public class IntPairUtil {
+    private IntPairUtil() {
+    }
+
+    public static final LongComparator FIRST_COMPARATOR = (x, y) -> Integer.compare(getFirst(x), getFirst(y));
+    public static final LongComparator SECOND_COMPARATOR = (x, y) -> Integer.compare(getSecond(x), getSecond(y));
+
+    public static int getFirst(long pair) {
+        return (int) (pair >> 32);
+    }
+
+    public static int getSecond(long pair) {
+        return (int) pair;
+    }
+
+    public static long of(int first, int second) {
+        return ((long) first << 32) + second;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index fb3019f..119984c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -24,9 +24,7 @@
 import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
@@ -42,23 +40,18 @@
 import org.apache.hyracks.storage.common.buffercache.CachedPage;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
-import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.util.annotations.NotThreadSafe;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 @NotThreadSafe
 public final class CloudColumnReadContext implements IColumnReadContext {
-    private static final Logger LOGGER = LogManager.getLogger();
     private final ColumnProjectorType operation;
     private final IPhysicalDrive drive;
     private final BitSet plan;
     private final BitSet cloudOnlyColumns;
     private final ColumnRanges columnRanges;
     private final CloudMegaPageReadContext columnCtx;
-    private final List<ICachedPage> pinnedPages;
     private final BitSet projectedColumns;
 
     public CloudColumnReadContext(IColumnProjectionInfo projectionInfo, IPhysicalDrive drive, BitSet plan) {
@@ -68,7 +61,6 @@
         columnRanges = new ColumnRanges(projectionInfo.getNumberOfPrimaryKeys());
         cloudOnlyColumns = new BitSet();
         columnCtx = new CloudMegaPageReadContext(operation, columnRanges, drive);
-        pinnedPages = new ArrayList<>();
         projectedColumns = new BitSet();
         if (operation == QUERY || operation == MODIFY) {
             for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) {
@@ -146,8 +138,7 @@
 
     private void pinAll(int fileId, int pageZeroId, int numberOfPages, IBufferCache bufferCache)
             throws HyracksDataException {
-        columnCtx.prepare(numberOfPages);
-        pin(bufferCache, fileId, pageZeroId, 1, numberOfPages);
+        columnCtx.pin(bufferCache, fileId, pageZeroId, 1, numberOfPages);
     }
 
     private void pinProjected(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
@@ -192,23 +183,7 @@
             }
 
             int numberOfPages = lastPageIdx - firstPageIdx + 1;
-            columnCtx.prepare(numberOfPages);
-            pin(bufferCache, fileId, pageZeroId, firstPageIdx, numberOfPages);
-        }
-    }
-
-    private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages)
-            throws HyracksDataException {
-        for (int i = start; i < start + numberOfPages; i++) {
-            long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
-            try {
-                pinnedPages.add(bufferCache.pin(dpid, columnCtx));
-            } catch (Throwable e) {
-                LOGGER.error("Error while pinning page number {} with number of pages {}. {}\n columnRanges:\n {}", i,
-                        numberOfPages, columnCtx, columnRanges);
-                throw e;
-            }
-
+            columnCtx.pin(bufferCache, fileId, pageZeroId, firstPageIdx, numberOfPages);
         }
     }
 
@@ -220,15 +195,7 @@
 
     @Override
     public void close(IBufferCache bufferCache) throws HyracksDataException {
-        release(pinnedPages, bufferCache, columnCtx);
-        columnCtx.close();
-    }
-
-    private static void release(List<ICachedPage> pinnedPages, IBufferCache bufferCache, IBufferCacheReadContext ctx)
-            throws HyracksDataException {
-        for (int i = 0; i < pinnedPages.size(); i++) {
-            bufferCache.unpin(pinnedPages.get(i), ctx);
-        }
-        pinnedPages.clear();
+        columnCtx.unpinAll(bufferCache);
+        columnCtx.closeStream();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index 2679d33..cd1e8ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -24,6 +24,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
@@ -34,6 +36,7 @@
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
 import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
 import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
@@ -48,6 +51,7 @@
     private final ColumnProjectorType operation;
     private final ColumnRanges columnRanges;
     private final IPhysicalDrive drive;
+    private final List<ICachedPage> pinnedPages;
 
     private int numberOfContiguousPages;
     private int pageCounter;
@@ -61,12 +65,15 @@
         this.operation = operation;
         this.columnRanges = columnRanges;
         this.drive = drive;
+        pinnedPages = new ArrayList<>();
     }
 
-    public void prepare(int numberOfContiguousPages) throws HyracksDataException {
-        close();
-        this.numberOfContiguousPages = numberOfContiguousPages;
+    void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages)
+            throws HyracksDataException {
+        closeStream();
+        this.numberOfContiguousPages = numberOfPages;
         pageCounter = 0;
+        doPin(bufferCache, fileId, pageZeroId, start, numberOfPages);
     }
 
     @Override
@@ -81,7 +88,6 @@
              * for this particular page to avoid placing the bytes of this page into another page's position.
              */
             skipStreamIfOpened(cachedPage);
-            pageCounter++;
         }
     }
 
@@ -131,15 +137,18 @@
             skipStreamIfOpened(cPage);
         }
 
-        if (++pageCounter == numberOfContiguousPages) {
-            close();
-        }
-
         // Finally process the header
         return DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
     }
 
-    void close() throws HyracksDataException {
+    void unpinAll(IBufferCache bufferCache) throws HyracksDataException {
+        for (int i = 0; i < pinnedPages.size(); i++) {
+            bufferCache.unpin(pinnedPages.get(i), this);
+        }
+        pinnedPages.clear();
+    }
+
+    void closeStream() throws HyracksDataException {
         if (gapStream != null) {
             if (remainingStreamBytes != 0) {
                 LOGGER.warn("Closed cloud stream with nonzero bytes = {}", remainingStreamBytes);
@@ -225,4 +234,20 @@
         }
     }
 
+    private void doPin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages)
+            throws HyracksDataException {
+        for (int i = start; i < start + numberOfPages; i++) {
+            long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
+            try {
+                pinnedPages.add(bufferCache.pin(dpid, this));
+                pageCounter++;
+            } catch (Throwable e) {
+                LOGGER.error(
+                        "Error while pinning page number {} with number of pages {}. "
+                                + "(streamOffset:{}, remainingStreamBytes: {}) columnRanges:\n {}",
+                        i, numberOfPages, streamOffset, remainingStreamBytes, columnRanges);
+                throw e;
+            }
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
index d4b230a..1b4d09d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
@@ -90,7 +90,7 @@
         return buf.getInt(NEXT_LEAF_OFFSET);
     }
 
-    public long getMegaLeafNodeLengthInBytes() {
+    public int getMegaLeafNodeLengthInBytes() {
         return buf.getInt(MEGA_LEAF_NODE_LENGTH);
     }