[ASTERIXDB-3433][STO] Capping read network call to cloud
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Merging page ranges to reduce the number of
network calls to object storage and also minimizing
the unwanted fetches.
The unwanted fetched pages are not pinned in the
buffer cache.
Change-Id: I343abcc5d2c2f1e65e8566e2a346e0a66da11d17
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18367
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ritik Raj <raj.ritik9835@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 119984c..fef150a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -46,6 +46,8 @@
@NotThreadSafe
public final class CloudColumnReadContext implements IColumnReadContext {
+ public static final Integer MAX_RANGES_COUNT = 3;
+
private final ColumnProjectorType operation;
private final IPhysicalDrive drive;
private final BitSet plan;
@@ -53,6 +55,7 @@
private final ColumnRanges columnRanges;
private final CloudMegaPageReadContext columnCtx;
private final BitSet projectedColumns;
+ private final MergedPageRanges mergedPageRanges;
public CloudColumnReadContext(IColumnProjectionInfo projectionInfo, IPhysicalDrive drive, BitSet plan) {
this.operation = projectionInfo.getProjectorType();
@@ -62,6 +65,7 @@
cloudOnlyColumns = new BitSet();
columnCtx = new CloudMegaPageReadContext(operation, columnRanges, drive);
projectedColumns = new BitSet();
+ mergedPageRanges = new MergedPageRanges(columnCtx, MAX_RANGES_COUNT);
if (operation == QUERY || operation == MODIFY) {
for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) {
int columnIndex = projectionInfo.getColumnIndex(i);
@@ -138,13 +142,11 @@
private void pinAll(int fileId, int pageZeroId, int numberOfPages, IBufferCache bufferCache)
throws HyracksDataException {
- columnCtx.pin(bufferCache, fileId, pageZeroId, 1, numberOfPages);
+ columnCtx.pin(bufferCache, fileId, pageZeroId, 1, numberOfPages, numberOfPages, MergedPageRanges.EMPTY);
}
private void pinProjected(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
- // TODO What if every other page is requested. That would do N/2 request, where N is the number of pages.
- // TODO This should be optimized in a way that minimizes the number of requests
-
+ mergedPageRanges.reset();
int[] columnsOrder = columnRanges.getColumnsOrder();
int i = 0;
int columnIndex = columnsOrder[i];
@@ -182,9 +184,10 @@
+ columnRanges.getTotalNumberOfPages());
}
- int numberOfPages = lastPageIdx - firstPageIdx + 1;
- columnCtx.pin(bufferCache, fileId, pageZeroId, firstPageIdx, numberOfPages);
+ mergedPageRanges.addRange(firstPageIdx, lastPageIdx);
}
+ // pin the calculated pageRanges
+ mergedPageRanges.pin(fileId, pageZeroId, bufferCache);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index cd1e8ed..ed6e83f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -25,6 +25,7 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -54,6 +55,8 @@
private final List<ICachedPage> pinnedPages;
private int numberOfContiguousPages;
+ // For logging, to get actual number of wanted pages
+ private int numberOfWantedPages;
private int pageCounter;
private InputStream gapStream;
@@ -68,12 +71,13 @@
pinnedPages = new ArrayList<>();
}
- void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages)
- throws HyracksDataException {
+ void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages,
+ int numberOfWantedPages, BitSet unwantedPages) throws HyracksDataException {
closeStream();
this.numberOfContiguousPages = numberOfPages;
+ this.numberOfWantedPages = numberOfWantedPages;
pageCounter = 0;
- doPin(bufferCache, fileId, pageZeroId, start, numberOfPages);
+ doPin(bufferCache, fileId, pageZeroId, start, numberOfPages, numberOfWantedPages, unwantedPages);
}
@Override
@@ -169,6 +173,17 @@
ByteBuffer buffer = header.getBuffer();
buffer.position(0);
+ // If the stream consists of the unwanted pages,
+ // if the currentPage's offset is greater, this means
+ // the streamOffset is pointing to a previous page.
+
+ // hence we should skip those many bytes.
+ // eg: if pageId(cPage) = 7 and streamOffset is pointing at 5
+ // then we need to jump 5,6 page worth of compressed size.
+ if (cPage.getCompressedPageOffset() > streamOffset) {
+ skipBytes(cPage.getCompressedPageOffset() - streamOffset);
+ }
+
try {
while (buffer.remaining() > 0) {
int length = stream.read(buffer.array(), buffer.position(), buffer.remaining());
@@ -208,8 +223,9 @@
streamOffset = offset;
LOGGER.info(
"Cloud stream read for pageId={} starting from pageCounter={} out of "
- + "numberOfContiguousPages={} (streamOffset = {}, remainingStreamBytes = {})",
- pageId, pageCounter, numberOfContiguousPages, streamOffset, remainingStreamBytes);
+ + "numberOfContiguousPages={} with numberOfWantedPages={}"
+ + " (streamOffset = {}, remainingStreamBytes = {})",
+ pageId, pageCounter, numberOfContiguousPages, numberOfWantedPages, streamOffset, remainingStreamBytes);
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), offset, length);
@@ -217,6 +233,23 @@
return gapStream;
}
+ private void skipBytes(long length) throws HyracksDataException {
+ if (gapStream == null) {
+ return;
+ }
+
+ try {
+ long lengthToSkip = length;
+ while (length > 0) {
+ length -= gapStream.skip(length);
+ }
+ streamOffset += lengthToSkip;
+ remainingStreamBytes -= lengthToSkip;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
private void skipStreamIfOpened(CachedPage cPage) throws HyracksDataException {
if (gapStream == null) {
return;
@@ -234,18 +267,22 @@
}
}
- private void doPin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages)
- throws HyracksDataException {
+ private void doPin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages,
+ int numberOfWantedPages, BitSet unwantedPages) throws HyracksDataException {
for (int i = start; i < start + numberOfPages; i++) {
- long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
+ int pageId = pageZeroId + i;
+ long dpid = BufferedFileHandle.getDiskPageId(fileId, pageId);
try {
- pinnedPages.add(bufferCache.pin(dpid, this));
+ if (!unwantedPages.get(pageId)) {
+ pinnedPages.add(bufferCache.pin(dpid, this));
+ }
pageCounter++;
} catch (Throwable e) {
LOGGER.error(
- "Error while pinning page number {} with number of pages {}. "
+ "Error while pinning page number {} with number of pages streamed {}, "
+ + "with actually wanted number of pages {}"
+ "(streamOffset:{}, remainingStreamBytes: {}) columnRanges:\n {}",
- i, numberOfPages, streamOffset, remainingStreamBytes, columnRanges);
+ i, numberOfPages, numberOfWantedPages, streamOffset, remainingStreamBytes, columnRanges);
throw e;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
new file mode 100644
index 0000000..c0c2fc9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.longs.LongArrayPriorityQueue;
+import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
+
+/**
+ * Merge the given ranges such that the maximum number of ranges <= N.
+ * Merge should be greedy as the range having lower gaps should be given priority.
+ */
+public class MergedPageRanges {
+ public static final BitSet EMPTY = new BitSet();
+ private final CloudMegaPageReadContext columnCtx;
+ private final int numRequiredRanges;
+ private final IntList pageRanges;
+ private final LongPriorityQueue gapRanges;
+ // indicates the index of the ranges which are merged
+ private final BitSet mergedIndex = new BitSet();
+ // indicates a page is requested or not
+ private final BitSet unwantedPages = new BitSet();
+ // indicates the extra pages got included while a merge
+ private int currentIndex = 0;
+ private int numRanges;
+
+ MergedPageRanges(CloudMegaPageReadContext columnCtx, int numRequiredRanges) {
+ this.numRequiredRanges = numRequiredRanges;
+ this.pageRanges = new IntArrayList(40);
+ this.gapRanges = new LongArrayPriorityQueue(IntPairUtil.FIRST_COMPARATOR);
+ this.columnCtx = columnCtx;
+ this.numRanges = 0;
+ }
+
+ public void reset() {
+ mergedIndex.clear();
+ pageRanges.clear();
+ gapRanges.clear();
+ numRanges = 0;
+ currentIndex = 0;
+ }
+
+ public void addRange(int rangeStart, int rangeEnd) {
+ pageRanges.add(rangeStart);
+ pageRanges.add(rangeEnd);
+ numRanges++;
+ }
+
+ public void mergeRanges() {
+ // totalMerges = totalRanges - MAXIMUM_RESULTANT_RANGES
+ int merges = numRanges - numRequiredRanges;
+ for (int i = 2; i < pageRanges.size(); i += 2) {
+ int previousRangeEnd = pageRanges.getInt(i - 1);
+ int currentRangeStart = pageRanges.getInt(i);
+ // this could be optimized to just enqueue "merges" ranges,
+ // but won't be much diff as the number of ranges gonna be small
+ long gap = IntPairUtil.of(currentRangeStart - previousRangeEnd, i / 2);
+ gapRanges.enqueue(gap);
+ }
+
+ int count = 0;
+ while (count < merges) {
+ // extract the lower 32 bits for the index.
+ int index = IntPairUtil.getSecond(gapRanges.dequeueLong());
+ // set the bit from [index - 1, index] indicating
+ // the index and index-1 are merged.
+ mergedIndex.set(index - 1, index + 1);
+ count++;
+ }
+ }
+
+ public void pin(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
+ // since the numRanges are already within set threshold
+ if (numRanges <= numRequiredRanges) {
+ pinWithoutMerge(fileId, pageZeroId, bufferCache);
+ return;
+ }
+ pinWithMerge(fileId, pageZeroId, bufferCache);
+ }
+
+ private void pinWithoutMerge(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
+ for (int pageIndex = 1; pageIndex < pageRanges.size(); pageIndex += 2) {
+ int lastPageIndex = pageRanges.getInt(pageIndex);
+ int firstPageIndex = pageRanges.getInt(pageIndex - 1);
+ int numberOfPages = lastPageIndex - firstPageIndex + 1;
+ columnCtx.pin(bufferCache, fileId, pageZeroId, firstPageIndex, numberOfPages, numberOfPages, EMPTY);
+ }
+ }
+
+ private void pinWithMerge(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
+ // merge the range based on the numRequiredRanges.
+ mergeRanges();
+ // go through page ranges and pin the required ranges.
+ int rangeCnt = 0;
+ while (rangeCnt < numRequiredRanges) {
+ unwantedPages.clear();
+ long mergedRange = getNextRange();
+
+ int firstRangeIdx = IntPairUtil.getFirst(mergedRange);
+ int lastRangeIdx = IntPairUtil.getSecond(mergedRange);
+
+ // since the ranges are flattened out in the pageRanges.
+ // hence ith index's element would be at [2*i, 2*i + 1]
+ int firstRangeStart = pageRanges.getInt(2 * firstRangeIdx);
+ int firstRangeEnd = pageRanges.getInt(2 * firstRangeIdx + 1);
+ int lastRangeStart = pageRanges.getInt(2 * lastRangeIdx);
+ int lastRangeEnd = pageRanges.getInt(2 * lastRangeIdx + 1);
+
+ int numberOfPages = lastRangeEnd - firstRangeStart + 1;
+ // Number of unwanted pages will be zero, when there is just a single range (i.e. no merge)
+ boolean areUnwantedPages = firstRangeIdx != lastRangeIdx;
+ // and when the there is no extra page being fetched. eg: [1 2] [3 4]
+ // for: [ 1 2 ] [ 4 5 ] [ 7 8 ] -> [ 1 8 ] ( fromIndex = 0, toIndex = 2 )
+ // numberOfUnwantedPages = (4 - 2 - 1) + (7 - 5 -1) = 2
+ areUnwantedPages = areUnwantedPages && (lastRangeStart - firstRangeEnd > 1);
+ int numberOfUnwantedPages = 0;
+ if (areUnwantedPages) {
+ // iterate through the index and mark the gaps
+ for (int fromIndex = firstRangeIdx; fromIndex < lastRangeIdx; fromIndex++) {
+ // Gap = V (2 * (fromIndex+1) ) - V(fromIndex * 2 + 1)
+ // V(index) = value at the index
+ int fromRangeEnd = pageRanges.getInt(2 * fromIndex + 1);
+ int toRangeStart = pageRanges.getInt(2 * (fromIndex + 1));
+ // fromRangeEnd != toRangeStart, as they would have been merged already
+ int rangeGap = (fromRangeEnd == toRangeStart) ? 0 : toRangeStart - fromRangeEnd - 1;
+ if (rangeGap > 0) {
+ unwantedPages.set(fromRangeEnd + 1, toRangeStart);
+ }
+ numberOfUnwantedPages += rangeGap;
+ }
+ }
+
+ columnCtx.pin(bufferCache, fileId, pageZeroId, firstRangeStart, numberOfPages,
+ numberOfPages - numberOfUnwantedPages, unwantedPages);
+ rangeCnt++;
+ }
+ }
+
+ // making package-private for MergedPageRangesTest
+ long getNextRange() {
+ int fromIndex = currentIndex;
+ int endIndex = currentIndex;
+ int toIndex;
+
+ // move till we have a set index, indicating all the indexes
+ // are merged into one range.
+ while (endIndex < numRanges && mergedIndex.get(endIndex)) {
+ endIndex++;
+ }
+
+ if (fromIndex == endIndex) {
+ currentIndex = endIndex + 1;
+ toIndex = endIndex;
+ } else {
+ currentIndex = endIndex;
+ toIndex = endIndex - 1;
+ }
+
+ return IntPairUtil.of(fromIndex, toIndex);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
new file mode 100644
index 0000000..169c14d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.List;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.Pair;
+
+public class MergedPageRagesTest {
+ private MergedPageRanges mergedPageRanges;
+ private final CloudMegaPageReadContext cloudMegaPageReadContext = null;
+
+ @Test
+ public void mergePageRanges1() {
+ int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
+
+ int requiredRangeCount = 3;
+ mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext, requiredRangeCount);
+ for (int i = 0; i < pageRanges.length; i += 2) {
+ mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
+ }
+
+ mergedPageRanges.mergeRanges();
+ // since the gaps are in following order
+ // ( 2, 1, 4, 4 )
+ // since we need 3 ranges, 5 - 3 = 2 merges should be done.
+ // hence the resultant ranges := ( 0, 2 ), ( 3, 3 ), ( 4, 4 )
+ List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 2), Pair.of(3, 3), Pair.of(4, 4));
+
+ for (int i = 0; i < requiredRangeCount; i++) {
+ long nextRange = mergedPageRanges.getNextRange();
+ Assert.assertEquals(ranges.get(i).first().intValue(), IntPairUtil.getFirst(nextRange));
+ Assert.assertEquals(ranges.get(i).second().intValue(), IntPairUtil.getSecond(nextRange));
+ }
+ }
+
+ @Test
+ public void allMerges() {
+ int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
+
+ int requiredRangeCount = 1;
+ mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext, requiredRangeCount);
+ for (int i = 0; i < pageRanges.length; i += 2) {
+ mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
+ }
+
+ mergedPageRanges.mergeRanges();
+ // since the gaps are in following order
+ // ( 2, 1, 4, 4 )
+ // since we need 1 ranges, 5 - 4 = 1 merge should be done.
+ // hence the resultant ranges := ( 0, 4)
+ List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 4));
+
+ for (int i = 0; i < requiredRangeCount; i++) {
+ long nextRange = mergedPageRanges.getNextRange();
+ Assert.assertEquals(ranges.get(i).first().intValue(), IntPairUtil.getFirst(nextRange));
+ Assert.assertEquals(ranges.get(i).second().intValue(), IntPairUtil.getSecond(nextRange));
+ }
+ }
+
+ @Test
+ public void noMerges() {
+ int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
+
+ int requiredRangeCount = 8;
+ mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext, requiredRangeCount);
+ for (int i = 0; i < pageRanges.length; i += 2) {
+ mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
+ }
+
+ mergedPageRanges.mergeRanges();
+ // since the gaps are in following order
+ // ( 2, 1, 4, 4 )
+ // since we need 8 ranges, no merge should be done.
+ List<Pair<Integer, Integer>> ranges =
+ List.of(Pair.of(0, 0), Pair.of(1, 1), Pair.of(2, 2), Pair.of(3, 3), Pair.of(4, 4));
+
+ for (int i = 0; i < ranges.size(); i++) {
+ long nextRange = mergedPageRanges.getNextRange();
+ Assert.assertEquals(ranges.get(i).first().intValue(), IntPairUtil.getFirst(nextRange));
+ Assert.assertEquals(ranges.get(i).second().intValue(), IntPairUtil.getSecond(nextRange));
+ }
+ }
+}