[ASTERIXDB-3601][STO] Fixing Merge failure

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
While bulkloading during merge, we calculate the present
columns in each of the leaf. But, there can be case where
a cursor gets closed because all the tuples have been read.
Closing a range cursor releases the page, hence can be reused.

While Bulkloading, even after the rangeCursor was closed, the
leaf was being asked for the present set of columns. Since, the
page has been reused, it contained differnt buffer, which when
read was giving wrong column details.

Hence, fixing this by calculating the info while reset happens
with new leaf, which always comes before closing the cursor.

Ext-ref: MB-67570
Change-Id: I87b3a084d01986dd5c2abd9452a2ad5619fbab15
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20038
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
index d756a6c..a8d159f 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
@@ -150,8 +150,9 @@
 
     @Override
     public void getAllColumns(BitSet presentColumns) {
-        int numberOfColumns = numberOfPresentColumns;
-        presentColumns.set(0, numberOfColumns);
+        //Don't ask for pageZeroBuf.getInt(NUMBER_OF_COLUMNS_OFFSET) here, as the cursor might have been closed.
+        //and the cached page might have been recycled.
+        presentColumns.set(0, numberOfPresentColumns);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
index 5955d5e..3fd7a11 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
@@ -29,21 +29,26 @@
 
 public class SparseColumnPageZeroReader extends DefaultColumnPageZeroReader {
     private final Int2IntOpenHashMap columnIndexToRelativeColumnIndex;
+    private final BitSet presentColumnsIndices;
 
     public SparseColumnPageZeroReader() {
         columnIndexToRelativeColumnIndex = new Int2IntOpenHashMap();
+        presentColumnsIndices = new BitSet();
         columnIndexToRelativeColumnIndex.defaultReturnValue(-1);
     }
 
     @Override
     public void reset(ByteBuffer pageZeroBuf, int headerSize) {
         super.reset(pageZeroBuf, headerSize);
+        setPresentColumnsIndices();
         columnIndexToRelativeColumnIndex.clear();
+
     }
 
     @Override
     public void reset(ByteBuffer pageZeroBuf, int numberOfPresentColumns, int headerSize) {
         super.reset(pageZeroBuf, numberOfPresentColumns, headerSize);
+        setPresentColumnsIndices();
         columnIndexToRelativeColumnIndex.clear();
     }
 
@@ -126,8 +131,8 @@
         return relativeColumnIndex != -1;
     }
 
-    @Override
-    public void getAllColumns(BitSet presentColumns) {
+    private void setPresentColumnsIndices() {
+        presentColumnsIndices.clear();
         if (numberOfPresentColumns == 0) {
             return;
         }
@@ -137,12 +142,18 @@
 
         while (columnIndex < limit) {
             int column = pageZeroBuf.getInt(columnIndex);
-            presentColumns.set(column);
+            presentColumnsIndices.set(column);
             columnIndex += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         }
     }
 
     @Override
+    public void getAllColumns(BitSet presentColumns) {
+        //Iterate through the present columns indices and set them in the BitSet
+        presentColumns.or(presentColumnsIndices);
+    }
+
+    @Override
     public int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
         int columnIndex = getColumnIndex(0);
         for (int i = 0; i < numberOfPresentColumns; i++) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
index d4ffbb4..8eaaed2 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
@@ -53,6 +53,7 @@
     private int zerothSegmentMaxColumns;
     private int numberOfPageZeroSegments; // includes the zeroth segment
     private ByteBuffer pageZeroBuf;
+    private int numberOfColumns;
 
     private final VoidPointable offsetPointable;
 
@@ -77,6 +78,7 @@
         zerothSegmentReader.reset(pageZeroBuf, Math.min(zerothSegmentMaxColumns, getNumberOfPresentColumns()),
                 headerSize);
         numberOfPageZeroSegments = pageZeroBuf.getInt(NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET);
+        numberOfColumns = pageZeroBuf.getInt(NUMBER_OF_COLUMNS_OFFSET);
     }
 
     @Override
@@ -220,7 +222,8 @@
 
     @Override
     public void getAllColumns(BitSet presentColumns) {
-        int numberOfColumns = getNumberOfPresentColumns();
+        //Don't ask for pageZeroBuf.getInt(NUMBER_OF_COLUMNS_OFFSET) here, as the cursor might have been closed.
+        //and the cached page might have been recycled.
         presentColumns.set(0, numberOfColumns);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
index 035db5d..379d81c 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
@@ -51,6 +51,8 @@
     private final int maxNumberOfColumnsInAPage;
     private final BitSet pageZeroSegmentsPages;
     private final Int2IntOpenHashMap columnIndexToRelativeColumnIndex;
+    private final VoidPointable offsetPointable;
+    private final BitSet presentColumnsIndices;
 
     private int maxColumnIndexInZerothSegment;
     private int numberOfColumnInZerothSegment;
@@ -58,10 +60,9 @@
     private int headerSize;
     private ByteBuffer pageZeroBuf;
 
-    private final VoidPointable offsetPointable;
-
     public SparseColumnMultiPageZeroReader(int bufferCapacity) {
         super();
+        presentColumnsIndices = new BitSet();
         zerothSegmentReader = new SparseColumnPageZeroReader();
         this.pageZeroSegmentsPages = new BitSet();
         this.maxNumberOfColumnsInAPage =
@@ -85,6 +86,7 @@
         headerSize = MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
         zerothSegmentReader.reset(pageZeroBuf, Math.min(numberOfColumnInZerothSegment, getNumberOfPresentColumns()),
                 headerSize);
+        setPresentColumnsIndices();
         columnIndexToRelativeColumnIndex.clear();
     }
 
@@ -276,24 +278,33 @@
         return findRelativeColumnIndex(columnIndex) != -1;
     }
 
-    @Override
-    public void getAllColumns(BitSet presentColumns) {
+    private void setPresentColumnsIndices() {
+        presentColumnsIndices.clear();
+        int numberOfPresentColumns = getNumberOfPresentColumns();
+        if (numberOfPresentColumns == 0) {
+            return;
+        }
         int columnOffsetStart = headerSize;
-        for (int i = 0; i < Math.min(getNumberOfPresentColumns(), numberOfColumnInZerothSegment); i++) {
+        for (int i = 0; i < Math.min(numberOfPresentColumns, numberOfColumnInZerothSegment); i++) {
             int columnIndex = pageZeroBuf.getInt(columnOffsetStart);
-            presentColumns.set(columnIndex);
+            presentColumnsIndices.set(columnIndex);
             columnOffsetStart += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         }
-        if (getNumberOfPresentColumns() > numberOfColumnInZerothSegment) {
+        if (numberOfPresentColumns > numberOfColumnInZerothSegment) {
             // read the rest of the columns from the segment stream
             int columnsInLastSegment = getNumberOfPresentColumns() - numberOfColumnInZerothSegment
                     - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage;
-            segmentBuffers.readAllColumns(presentColumns, numberOfPageZeroSegments, maxNumberOfColumnsInAPage,
+            segmentBuffers.readAllColumns(presentColumnsIndices, numberOfPageZeroSegments, maxNumberOfColumnsInAPage,
                     columnsInLastSegment);
         }
     }
 
     @Override
+    public void getAllColumns(BitSet presentColumns) {
+        presentColumns.or(presentColumnsIndices);
+    }
+
+    @Override
     public ByteBuffer getPageZeroBuf() {
         throw new UnsupportedOperationException("This method is not supported for multi-page zero readers.");
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index 87b35bd..e6170a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -84,7 +84,7 @@
      * @param leafFrame to compute the ranges for
      */
     public void reset(ColumnBTreeReadLeafFrame leafFrame) throws HyracksDataException {
-        reset(leafFrame, EMPTY, EMPTY, EMPTY);
+        reset(leafFrame, EMPTY, EMPTY, EMPTY, false);
     }
 
     /**
@@ -94,7 +94,7 @@
      * @param plan      eviction plan
      */
     public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet plan) throws HyracksDataException {
-        reset(leafFrame, plan, EMPTY, EMPTY);
+        reset(leafFrame, plan, EMPTY, EMPTY, false);
     }
 
     /**
@@ -106,7 +106,7 @@
      * @param cloudOnlyColumns locked columns that cannot be read from a local disk
      */
     public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet requestedColumns, BitSet evictableColumns,
-            BitSet cloudOnlyColumns) throws HyracksDataException {
+            BitSet cloudOnlyColumns, boolean unPinPageZeroSegments) throws HyracksDataException {
         try {
             // Set leafFrame
             this.leafFrame = leafFrame;
@@ -167,8 +167,10 @@
             // to indicate the end
             columnsOrder[columnOrdinal] = -1;
         } finally {
-            //Unpin the not required segment pages
-            leafFrame.unPinNotRequiredPageZeroSegments();
+            if (unPinPageZeroSegments) {
+                //Unpin the not required segment pages
+                leafFrame.unPinNotRequiredPageZeroSegments();
+            }
         }
     }
 
@@ -213,7 +215,7 @@
      *
      * @param pageId page ID
      * @return true of the page should be read from the cloud, false otherwise
-     * @see #reset(ColumnBTreeReadLeafFrame, BitSet, BitSet, BitSet)
+     * @see #reset(ColumnBTreeReadLeafFrame, BitSet, BitSet, BitSet, boolean)
      */
     public boolean isCloudOnly(int pageId) {
         // Compute the relative page ID for this mega leaf node
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 181aa06..b499a50 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -158,7 +158,7 @@
             return;
         }
 
-        columnRanges.reset(leafFrame, projectedColumns, plan, cloudOnlyColumns);
+        columnRanges.reset(leafFrame, projectedColumns, plan, cloudOnlyColumns, true);
         int pageZeroId = leafFrame.getPageId();
         int numberOfPageZeroSegments = leafFrame.getNumberOfPageZeroSegments();
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
index ba4376f..2a9dd25 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -202,6 +202,7 @@
                             }
                         }
                     }
+                    segmentPagesTempHolder.clear();
                     context.unpin(page0, bcOpCtx);
                 }
             }