[ASTERIXDB-3314][STO] Reduce buffer cache pressure on columnar

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Make max merging components count to 4 in columnar datasets
- Fix not unpinning columnar filter pages
- Allocate initial 32KB buffers for columnar writers

Change-Id: I809109b232bc5a5db0c47a52cb98c838ff55e27f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17965
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
index cf2808e..38f7321 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
@@ -25,8 +25,11 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.util.StorageUtil;
 
 public final class MultiTemporaryBufferBytesOutputStream extends AbstractMultiBufferBytesOutputStream {
+    private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(32, StorageUtil.StorageUnit.KILOBYTE);
+
     public MultiTemporaryBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
         super(multiPageOpRef);
     }
@@ -38,6 +41,14 @@
 
     @Override
     protected ByteBuffer confiscateNewBuffer() throws HyracksDataException {
+        if (buffers.isEmpty()) {
+            /*
+             * One buffer on the house to avoid confiscating a whole page for a tiny stream.
+             * This protects pressuring the buffer cache from confiscating pages for small columns. Think sparse
+             * columns, which may take only a few hundreds of bytes to write.
+             */
+            return ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+        }
         return multiPageOpRef.getValue().confiscateTemporary();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index 8e0bc0c..93eca7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -95,5 +95,9 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 48bd180..3e72584 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -38,8 +38,11 @@
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements IColumnWriteMultiPageOp {
+    private static final Logger LOGGER = LogManager.getLogger();
     private final List<CachedPage> columnsPages;
     private final List<CachedPage> tempConfiscatedPages;
     private final ColumnBTreeWriteLeafFrame columnarFrame;
@@ -48,6 +51,12 @@
     private boolean setLowKey;
     private int tupleCount;
 
+    // For logging
+    private int numberOfLeafNodes;
+    private int numberOfPagesInCurrentLeafNode;
+    private int maxNumberOfPagesForAColumn;
+    private int maxNumberOfPagesInALeafNode;
+
     public ColumnBTreeBulkloader(float fillFactor, boolean verifyInput, IPageWriteCallback callback, ITreeIndex index,
             ITreeIndexFrame leafFrame) throws HyracksDataException {
         super(fillFactor, verifyInput, callback, index, leafFrame);
@@ -59,6 +68,12 @@
         lowKey = new BTreeSplitKey(tupleWriter.createTupleReference());
         lowKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
         setLowKey = true;
+
+        // For logging. Starts with 1 for page0
+        numberOfPagesInCurrentLeafNode = 1;
+        maxNumberOfPagesForAColumn = 0;
+        maxNumberOfPagesInALeafNode = 0;
+        numberOfLeafNodes = 1;
     }
 
     @Override
@@ -118,9 +133,14 @@
         for (ICachedPage page : tempConfiscatedPages) {
             bufferCache.returnPage(page, false);
         }
+
+        // For logging
+        int numberOfTempConfiscatedPages = tempConfiscatedPages.size();
         tempConfiscatedPages.clear();
         //Where Page0 and columns pages will be written
         super.end();
+
+        log("Finished");
     }
 
     @Override
@@ -156,6 +176,12 @@
         splitKey.setRightPage(leafFrontier.pageId);
         setLowKey = true;
         tupleCount = 0;
+
+        // For logging
+        maxNumberOfPagesInALeafNode = Math.max(maxNumberOfPagesInALeafNode, numberOfPagesInCurrentLeafNode);
+        // Starts with 1 for page0
+        numberOfPagesInCurrentLeafNode = 1;
+        numberOfLeafNodes++;
     }
 
     @Override
@@ -172,6 +198,12 @@
         for (ICachedPage c : columnsPages) {
             write(c);
         }
+
+        // For logging
+        int numberOfPagesInPersistedColumn = columnsPages.size();
+        maxNumberOfPagesForAColumn = Math.max(maxNumberOfPagesForAColumn, numberOfPagesInPersistedColumn);
+        numberOfPagesInCurrentLeafNode += numberOfPagesInPersistedColumn;
+
         columnsPages.clear();
     }
 
@@ -185,6 +217,9 @@
             bufferCache.returnPage(page, false);
         }
         super.abort();
+
+        // For logging
+        log("Aborted");
     }
 
     private void setSplitKey(ISplitKey splitKey, ITupleReference tuple) {
@@ -193,6 +228,18 @@
         tupleWriter.writeTupleFields(tuple, 0, cmp.getKeyFieldCount(), splitKey.getBuffer().array(), 0);
     }
 
+    private void log(String status) {
+        if (!LOGGER.isDebugEnabled()) {
+            return;
+        }
+
+        int numberOfTempConfiscatedPages = tempConfiscatedPages.size();
+        LOGGER.debug(
+                "{} columnar bulkloader used leafNodes: {}, tempPagesAllocated: {}, maxPagesPerColumn: {}, and maxLeafNodePages: {}",
+                status, numberOfLeafNodes, numberOfTempConfiscatedPages, maxNumberOfPagesForAColumn,
+                maxNumberOfPagesInALeafNode);
+    }
+
     /*
      * ***********************************************************
      * IColumnWriteMultiPageOp
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index fd726cd..39952df 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -256,8 +256,8 @@
 
     @Override
     public void doClose() throws HyracksDataException {
-        frameTuple.close();
         releasePages();
+        frameTuple.close();
         page0 = null;
         pred = null;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index e638a4a..3923025 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -32,6 +32,9 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+
 public abstract class AbstractColumnTupleReference implements IColumnTupleIterator {
     private static final Logger LOGGER = LogManager.getLogger();
     private static final String UNSUPPORTED_OPERATION_MSG = "Operation is not supported for column tuples";
@@ -41,11 +44,15 @@
     private final IColumnBufferProvider[] filterBufferProviders;
     private final IColumnBufferProvider[] buffersProviders;
     private final int numberOfPrimaryKeys;
-    private int totalNumberOfMegaLeafNodes;
-    private int numOfSkippedMegaLeafNodes;
     private int endIndex;
     protected int tupleIndex;
 
+    // For logging
+    private final LongSet pinnedPages;
+    private int totalNumberOfMegaLeafNodes;
+    private int numOfSkippedMegaLeafNodes;
+    private int maxNumberOfPinnedPages;
+
     /**
      * Column tuple reference
      *
@@ -64,6 +71,7 @@
             primaryKeyBufferProviders[i] = new ColumnSingleBufferProvider(i);
         }
 
+        pinnedPages = new LongOpenHashSet();
         int numberOfFilteredColumns = info.getNumberOfFilteredColumns();
         filterBufferProviders = new IColumnBufferProvider[numberOfFilteredColumns];
         for (int i = 0; i < numberOfFilteredColumns; i++) {
@@ -71,7 +79,7 @@
             if (columnIndex < 0) {
                 filterBufferProviders[i] = DummyColumnBufferProvider.INSTANCE;
             } else if (columnIndex >= numberOfPrimaryKeys) {
-                filterBufferProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp);
+                filterBufferProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp, pinnedPages);
             } else {
                 filterBufferProviders[i] = new ColumnSingleBufferProvider(columnIndex);
             }
@@ -82,7 +90,7 @@
         for (int i = 0; i < numberOfRequestedColumns; i++) {
             int columnIndex = info.getColumnIndex(i);
             if (columnIndex >= numberOfPrimaryKeys) {
-                buffersProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp);
+                buffersProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp, pinnedPages);
             } else {
                 buffersProviders[i] = DummyColumnBufferProvider.INSTANCE;
             }
@@ -116,6 +124,8 @@
         int numberOfTuples = frame.getTupleCount();
         //Start new page and check whether we should skip reading non-key columns or not
         boolean readColumnPages = startNewPage(pageZero, frame.getNumberOfColumns(), numberOfTuples);
+        //Release previous pinned pages if any
+        unpinColumnsPages();
         /*
          * When startIndex = 0, a call to next() is performed to get the information of the PK
          * and 0 skips will be performed. If startIndex (for example) is 5, a call to next() will be performed
@@ -125,8 +135,6 @@
         if (readColumnPages) {
             for (int i = 0; i < filterBufferProviders.length; i++) {
                 IColumnBufferProvider provider = filterBufferProviders[i];
-                //Release previous pinned pages if any
-                provider.releaseAll();
                 provider.reset(frame);
                 startColumnFilter(provider, i, numberOfTuples);
             }
@@ -135,11 +143,10 @@
         if (readColumnPages && evaluateFilter()) {
             for (int i = 0; i < buffersProviders.length; i++) {
                 IColumnBufferProvider provider = buffersProviders[i];
-                //Release previous pinned pages if any
-                provider.releaseAll();
                 provider.reset(frame);
                 startColumn(provider, i, numberOfTuples);
             }
+
             /*
              * skipCount can be < 0 for cases when the tuples in the range [0, startIndex] are all anti-matters.
              * Consequently, tuples in the range [0, startIndex] do not have any non-key columns. Thus, the returned
@@ -150,6 +157,7 @@
         } else {
             numOfSkippedMegaLeafNodes++;
         }
+
         totalNumberOfMegaLeafNodes++;
     }
 
@@ -232,17 +240,30 @@
 
     @Override
     public final void unpinColumnsPages() throws HyracksDataException {
+        for (int i = 0; i < filterBufferProviders.length; i++) {
+            filterBufferProviders[i].releaseAll();
+        }
+
         for (int i = 0; i < buffersProviders.length; i++) {
             buffersProviders[i].releaseAll();
         }
+
+        maxNumberOfPinnedPages = Math.max(maxNumberOfPinnedPages, pinnedPages.size());
+        pinnedPages.clear();
     }
 
     @Override
     public final void close() {
-        if (LOGGER.isInfoEnabled() && numOfSkippedMegaLeafNodes > 0) {
-            LOGGER.info("Filtered {} disk mega-leaf nodes out of {} in total", numOfSkippedMegaLeafNodes,
+        if (!LOGGER.isDebugEnabled()) {
+            return;
+        }
+
+        if (numOfSkippedMegaLeafNodes > 0) {
+            LOGGER.debug("Filtered {} disk mega-leaf nodes out of {} in total", numOfSkippedMegaLeafNodes,
                     totalNumberOfMegaLeafNodes);
         }
+
+        LOGGER.debug("Max number of pinned pages is {}", maxNumberOfPinnedPages + 1);
     }
 
     /* *************************************************************
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
index 0c17d6b..34ec856 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
@@ -26,20 +26,25 @@
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
+import it.unimi.dsi.fastutil.longs.LongSet;
+
 public final class ColumnMultiBufferProvider implements IColumnBufferProvider {
     private final int columnIndex;
     private final IColumnReadMultiPageOp multiPageOp;
     private final Queue<ICachedPage> pages;
+    private final LongSet pinnedPages;
     private int numberOfPages;
     private int startPage;
     private int startOffset;
     private int length;
 
-    public ColumnMultiBufferProvider(int columnIndex, IColumnReadMultiPageOp multiPageOp) {
+    public ColumnMultiBufferProvider(int columnIndex, IColumnReadMultiPageOp multiPageOp, LongSet pinnedPages) {
         this.columnIndex = columnIndex;
         this.multiPageOp = multiPageOp;
+        this.pinnedPages = pinnedPages;
         pages = new ArrayDeque<>();
     }
 
@@ -107,6 +112,7 @@
     private ByteBuffer readNext() throws HyracksDataException {
         ICachedPage columnPage = multiPageOp.pin(startPage++);
         pages.add(columnPage);
+        pinnedPages.add(((CachedPage) columnPage).getDiskPageId());
         return columnPage.getBuffer();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 9fcce8b..461d416 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -583,7 +583,9 @@
     @Override
     public void merge(ILSMIOOperation operation) throws HyracksDataException {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Started a merge operation for index: {}", lsmIndex);
+            MergeOperation mergeOp = (MergeOperation) operation;
+            LOGGER.debug("Started a merge operation (number of merging components {}) for index: {}",
+                    mergeOp.getMergingComponents().size(), lsmIndex);
         }
         synchronized (opTracker) {
             enterComponents(operation.getAccessor().getOpContext(), LSMOperationType.MERGE);