[ASTERIXDB-3314][STO] Reduce buffer cache pressure on columnar
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Make max merging components count to 4 in columnar datasets
- Fix not unpinning columnar filter pages
- Allocate initial 32KB buffers for columnar writers
Change-Id: I809109b232bc5a5db0c47a52cb98c838ff55e27f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17965
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
index cf2808e..38f7321 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
@@ -25,8 +25,11 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.util.StorageUtil;
public final class MultiTemporaryBufferBytesOutputStream extends AbstractMultiBufferBytesOutputStream {
+ private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(32, StorageUtil.StorageUnit.KILOBYTE);
+
public MultiTemporaryBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
super(multiPageOpRef);
}
@@ -38,6 +41,14 @@
@Override
protected ByteBuffer confiscateNewBuffer() throws HyracksDataException {
+ if (buffers.isEmpty()) {
+ /*
+ * One buffer on the house to avoid confiscating a whole page for a tiny stream.
+ * This protects pressuring the buffer cache from confiscating pages for small columns. Think sparse
+ * columns, which may take only a few hundreds of bytes to write.
+ */
+ return ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+ }
return multiPageOpRef.getValue().confiscateTemporary();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index 8e0bc0c..93eca7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -95,5 +95,9 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 48bd180..3e72584 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -38,8 +38,11 @@
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements IColumnWriteMultiPageOp {
+ private static final Logger LOGGER = LogManager.getLogger();
private final List<CachedPage> columnsPages;
private final List<CachedPage> tempConfiscatedPages;
private final ColumnBTreeWriteLeafFrame columnarFrame;
@@ -48,6 +51,12 @@
private boolean setLowKey;
private int tupleCount;
+ // For logging
+ private int numberOfLeafNodes;
+ private int numberOfPagesInCurrentLeafNode;
+ private int maxNumberOfPagesForAColumn;
+ private int maxNumberOfPagesInALeafNode;
+
public ColumnBTreeBulkloader(float fillFactor, boolean verifyInput, IPageWriteCallback callback, ITreeIndex index,
ITreeIndexFrame leafFrame) throws HyracksDataException {
super(fillFactor, verifyInput, callback, index, leafFrame);
@@ -59,6 +68,12 @@
lowKey = new BTreeSplitKey(tupleWriter.createTupleReference());
lowKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
setLowKey = true;
+
+ // For logging. Starts with 1 for page0
+ numberOfPagesInCurrentLeafNode = 1;
+ maxNumberOfPagesForAColumn = 0;
+ maxNumberOfPagesInALeafNode = 0;
+ numberOfLeafNodes = 1;
}
@Override
@@ -118,9 +133,14 @@
for (ICachedPage page : tempConfiscatedPages) {
bufferCache.returnPage(page, false);
}
+
+ // For logging
+ int numberOfTempConfiscatedPages = tempConfiscatedPages.size();
tempConfiscatedPages.clear();
//Where Page0 and columns pages will be written
super.end();
+
+ log("Finished");
}
@Override
@@ -156,6 +176,12 @@
splitKey.setRightPage(leafFrontier.pageId);
setLowKey = true;
tupleCount = 0;
+
+ // For logging
+ maxNumberOfPagesInALeafNode = Math.max(maxNumberOfPagesInALeafNode, numberOfPagesInCurrentLeafNode);
+ // Starts with 1 for page0
+ numberOfPagesInCurrentLeafNode = 1;
+ numberOfLeafNodes++;
}
@Override
@@ -172,6 +198,12 @@
for (ICachedPage c : columnsPages) {
write(c);
}
+
+ // For logging
+ int numberOfPagesInPersistedColumn = columnsPages.size();
+ maxNumberOfPagesForAColumn = Math.max(maxNumberOfPagesForAColumn, numberOfPagesInPersistedColumn);
+ numberOfPagesInCurrentLeafNode += numberOfPagesInPersistedColumn;
+
columnsPages.clear();
}
@@ -185,6 +217,9 @@
bufferCache.returnPage(page, false);
}
super.abort();
+
+ // For logging
+ log("Aborted");
}
private void setSplitKey(ISplitKey splitKey, ITupleReference tuple) {
@@ -193,6 +228,18 @@
tupleWriter.writeTupleFields(tuple, 0, cmp.getKeyFieldCount(), splitKey.getBuffer().array(), 0);
}
+ private void log(String status) {
+ if (!LOGGER.isDebugEnabled()) {
+ return;
+ }
+
+ int numberOfTempConfiscatedPages = tempConfiscatedPages.size();
+ LOGGER.debug(
+ "{} columnar bulkloader used leafNodes: {}, tempPagesAllocated: {}, maxPagesPerColumn: {}, and maxLeafNodePages: {}",
+ status, numberOfLeafNodes, numberOfTempConfiscatedPages, maxNumberOfPagesForAColumn,
+ maxNumberOfPagesInALeafNode);
+ }
+
/*
* ***********************************************************
* IColumnWriteMultiPageOp
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index fd726cd..39952df 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -256,8 +256,8 @@
@Override
public void doClose() throws HyracksDataException {
- frameTuple.close();
releasePages();
+ frameTuple.close();
page0 = null;
pred = null;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index e638a4a..3923025 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -32,6 +32,9 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+
public abstract class AbstractColumnTupleReference implements IColumnTupleIterator {
private static final Logger LOGGER = LogManager.getLogger();
private static final String UNSUPPORTED_OPERATION_MSG = "Operation is not supported for column tuples";
@@ -41,11 +44,15 @@
private final IColumnBufferProvider[] filterBufferProviders;
private final IColumnBufferProvider[] buffersProviders;
private final int numberOfPrimaryKeys;
- private int totalNumberOfMegaLeafNodes;
- private int numOfSkippedMegaLeafNodes;
private int endIndex;
protected int tupleIndex;
+ // For logging
+ private final LongSet pinnedPages;
+ private int totalNumberOfMegaLeafNodes;
+ private int numOfSkippedMegaLeafNodes;
+ private int maxNumberOfPinnedPages;
+
/**
* Column tuple reference
*
@@ -64,6 +71,7 @@
primaryKeyBufferProviders[i] = new ColumnSingleBufferProvider(i);
}
+ pinnedPages = new LongOpenHashSet();
int numberOfFilteredColumns = info.getNumberOfFilteredColumns();
filterBufferProviders = new IColumnBufferProvider[numberOfFilteredColumns];
for (int i = 0; i < numberOfFilteredColumns; i++) {
@@ -71,7 +79,7 @@
if (columnIndex < 0) {
filterBufferProviders[i] = DummyColumnBufferProvider.INSTANCE;
} else if (columnIndex >= numberOfPrimaryKeys) {
- filterBufferProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp);
+ filterBufferProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp, pinnedPages);
} else {
filterBufferProviders[i] = new ColumnSingleBufferProvider(columnIndex);
}
@@ -82,7 +90,7 @@
for (int i = 0; i < numberOfRequestedColumns; i++) {
int columnIndex = info.getColumnIndex(i);
if (columnIndex >= numberOfPrimaryKeys) {
- buffersProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp);
+ buffersProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp, pinnedPages);
} else {
buffersProviders[i] = DummyColumnBufferProvider.INSTANCE;
}
@@ -116,6 +124,8 @@
int numberOfTuples = frame.getTupleCount();
//Start new page and check whether we should skip reading non-key columns or not
boolean readColumnPages = startNewPage(pageZero, frame.getNumberOfColumns(), numberOfTuples);
+ //Release previous pinned pages if any
+ unpinColumnsPages();
/*
* When startIndex = 0, a call to next() is performed to get the information of the PK
* and 0 skips will be performed. If startIndex (for example) is 5, a call to next() will be performed
@@ -125,8 +135,6 @@
if (readColumnPages) {
for (int i = 0; i < filterBufferProviders.length; i++) {
IColumnBufferProvider provider = filterBufferProviders[i];
- //Release previous pinned pages if any
- provider.releaseAll();
provider.reset(frame);
startColumnFilter(provider, i, numberOfTuples);
}
@@ -135,11 +143,10 @@
if (readColumnPages && evaluateFilter()) {
for (int i = 0; i < buffersProviders.length; i++) {
IColumnBufferProvider provider = buffersProviders[i];
- //Release previous pinned pages if any
- provider.releaseAll();
provider.reset(frame);
startColumn(provider, i, numberOfTuples);
}
+
/*
* skipCount can be < 0 for cases when the tuples in the range [0, startIndex] are all anti-matters.
* Consequently, tuples in the range [0, startIndex] do not have any non-key columns. Thus, the returned
@@ -150,6 +157,7 @@
} else {
numOfSkippedMegaLeafNodes++;
}
+
totalNumberOfMegaLeafNodes++;
}
@@ -232,17 +240,30 @@
@Override
public final void unpinColumnsPages() throws HyracksDataException {
+ for (int i = 0; i < filterBufferProviders.length; i++) {
+ filterBufferProviders[i].releaseAll();
+ }
+
for (int i = 0; i < buffersProviders.length; i++) {
buffersProviders[i].releaseAll();
}
+
+ maxNumberOfPinnedPages = Math.max(maxNumberOfPinnedPages, pinnedPages.size());
+ pinnedPages.clear();
}
@Override
public final void close() {
- if (LOGGER.isInfoEnabled() && numOfSkippedMegaLeafNodes > 0) {
- LOGGER.info("Filtered {} disk mega-leaf nodes out of {} in total", numOfSkippedMegaLeafNodes,
+ if (!LOGGER.isDebugEnabled()) {
+ return;
+ }
+
+ if (numOfSkippedMegaLeafNodes > 0) {
+ LOGGER.debug("Filtered {} disk mega-leaf nodes out of {} in total", numOfSkippedMegaLeafNodes,
totalNumberOfMegaLeafNodes);
}
+
+ LOGGER.debug("Max number of pinned pages is {}", maxNumberOfPinnedPages + 1);
}
/* *************************************************************
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
index 0c17d6b..34ec856 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
@@ -26,20 +26,25 @@
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import it.unimi.dsi.fastutil.longs.LongSet;
+
public final class ColumnMultiBufferProvider implements IColumnBufferProvider {
private final int columnIndex;
private final IColumnReadMultiPageOp multiPageOp;
private final Queue<ICachedPage> pages;
+ private final LongSet pinnedPages;
private int numberOfPages;
private int startPage;
private int startOffset;
private int length;
- public ColumnMultiBufferProvider(int columnIndex, IColumnReadMultiPageOp multiPageOp) {
+ public ColumnMultiBufferProvider(int columnIndex, IColumnReadMultiPageOp multiPageOp, LongSet pinnedPages) {
this.columnIndex = columnIndex;
this.multiPageOp = multiPageOp;
+ this.pinnedPages = pinnedPages;
pages = new ArrayDeque<>();
}
@@ -107,6 +112,7 @@
private ByteBuffer readNext() throws HyracksDataException {
ICachedPage columnPage = multiPageOp.pin(startPage++);
pages.add(columnPage);
+ pinnedPages.add(((CachedPage) columnPage).getDiskPageId());
return columnPage.getBuffer();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 9fcce8b..461d416 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -583,7 +583,9 @@
@Override
public void merge(ILSMIOOperation operation) throws HyracksDataException {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Started a merge operation for index: {}", lsmIndex);
+ MergeOperation mergeOp = (MergeOperation) operation;
+ LOGGER.debug("Started a merge operation (number of merging components {}) for index: {}",
+ mergeOp.getMergingComponents().size(), lsmIndex);
}
synchronized (opTracker) {
enterComponents(operation.getAccessor().getOpContext(), LSMOperationType.MERGE);