[NO ISSUE][STO] Column cloud refactoring
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Refactor common code and some cleanups in cloud
column read path.
Change-Id: I2e98a86aa4d7f6d25b85b4ae86bbd3232a6ef0ac
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18382
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index 5c6fe09..eacf4fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -38,8 +38,7 @@
* Computes columns offsets, lengths, and pages
*/
public final class ColumnRanges {
- private static final LongComparator OFFSET_COMPARATOR =
- (x, y) -> Integer.compare(getOffsetFromPair(x), getOffsetFromPair(y));
+ private static final LongComparator OFFSET_COMPARATOR = IntPairUtil.FIRST_COMPARATOR;
private final int numberOfPrimaryKeys;
// For eviction
@@ -112,13 +111,14 @@
// Get the number of columns in a page
int numberOfColumns = leafFrame.getNumberOfColumns();
for (int i = 0; i < numberOfColumns; i++) {
- long offset = leafFrame.getColumnOffset(i);
+ int offset = leafFrame.getColumnOffset(i);
// Set the first 32-bits to the offset and the second 32-bits to columnIndex
- offsetColumnIndexPairs[i] = (offset << 32) + i;
+ offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
}
// Set artificial offset to determine the last column's length
- offsetColumnIndexPairs[numberOfColumns] = (leafFrame.getMegaLeafNodeLengthInBytes() << 32) + numberOfColumns;
+ int megaLeafLength = leafFrame.getMegaLeafNodeLengthInBytes();
+ offsetColumnIndexPairs[numberOfColumns] = IntPairUtil.of(megaLeafLength, numberOfColumns);
// Sort the pairs by offset (i.e., lowest offset first)
LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfColumns, OFFSET_COMPARATOR);
@@ -251,11 +251,11 @@
}
private static int getOffsetFromPair(long pair) {
- return (int) (pair >> 32);
+ return IntPairUtil.getFirst(pair);
}
private static int getColumnIndexFromPair(long pair) {
- return (int) pair;
+ return IntPairUtil.getSecond(pair);
}
private void setCloudOnlyAndEvictablePages(int columnIndex, BitSet cloudOnlyColumns, BitSet evictableColumns,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/IntPairUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/IntPairUtil.java
new file mode 100644
index 0000000..c1fe0cf
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/IntPairUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import it.unimi.dsi.fastutil.longs.LongComparator;
+
+public class IntPairUtil {
+ private IntPairUtil() {
+ }
+
+ public static final LongComparator FIRST_COMPARATOR = (x, y) -> Integer.compare(getFirst(x), getFirst(y));
+ public static final LongComparator SECOND_COMPARATOR = (x, y) -> Integer.compare(getSecond(x), getSecond(y));
+
+ public static int getFirst(long pair) {
+ return (int) (pair >> 32);
+ }
+
+ public static int getSecond(long pair) {
+ return (int) pair;
+ }
+
+ public static long of(int first, int second) {
+ return ((long) first << 32) + second;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index fb3019f..119984c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -24,9 +24,7 @@
import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.BitSet;
-import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
@@ -42,23 +40,18 @@
import org.apache.hyracks.storage.common.buffercache.CachedPage;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
-import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.util.annotations.NotThreadSafe;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
@NotThreadSafe
public final class CloudColumnReadContext implements IColumnReadContext {
- private static final Logger LOGGER = LogManager.getLogger();
private final ColumnProjectorType operation;
private final IPhysicalDrive drive;
private final BitSet plan;
private final BitSet cloudOnlyColumns;
private final ColumnRanges columnRanges;
private final CloudMegaPageReadContext columnCtx;
- private final List<ICachedPage> pinnedPages;
private final BitSet projectedColumns;
public CloudColumnReadContext(IColumnProjectionInfo projectionInfo, IPhysicalDrive drive, BitSet plan) {
@@ -68,7 +61,6 @@
columnRanges = new ColumnRanges(projectionInfo.getNumberOfPrimaryKeys());
cloudOnlyColumns = new BitSet();
columnCtx = new CloudMegaPageReadContext(operation, columnRanges, drive);
- pinnedPages = new ArrayList<>();
projectedColumns = new BitSet();
if (operation == QUERY || operation == MODIFY) {
for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) {
@@ -146,8 +138,7 @@
private void pinAll(int fileId, int pageZeroId, int numberOfPages, IBufferCache bufferCache)
throws HyracksDataException {
- columnCtx.prepare(numberOfPages);
- pin(bufferCache, fileId, pageZeroId, 1, numberOfPages);
+ columnCtx.pin(bufferCache, fileId, pageZeroId, 1, numberOfPages);
}
private void pinProjected(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
@@ -192,23 +183,7 @@
}
int numberOfPages = lastPageIdx - firstPageIdx + 1;
- columnCtx.prepare(numberOfPages);
- pin(bufferCache, fileId, pageZeroId, firstPageIdx, numberOfPages);
- }
- }
-
- private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages)
- throws HyracksDataException {
- for (int i = start; i < start + numberOfPages; i++) {
- long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
- try {
- pinnedPages.add(bufferCache.pin(dpid, columnCtx));
- } catch (Throwable e) {
- LOGGER.error("Error while pinning page number {} with number of pages {}. {}\n columnRanges:\n {}", i,
- numberOfPages, columnCtx, columnRanges);
- throw e;
- }
-
+ columnCtx.pin(bufferCache, fileId, pageZeroId, firstPageIdx, numberOfPages);
}
}
@@ -220,15 +195,7 @@
@Override
public void close(IBufferCache bufferCache) throws HyracksDataException {
- release(pinnedPages, bufferCache, columnCtx);
- columnCtx.close();
- }
-
- private static void release(List<ICachedPage> pinnedPages, IBufferCache bufferCache, IBufferCacheReadContext ctx)
- throws HyracksDataException {
- for (int i = 0; i < pinnedPages.size(); i++) {
- bufferCache.unpin(pinnedPages.get(i), ctx);
- }
- pinnedPages.clear();
+ columnCtx.unpinAll(bufferCache);
+ columnCtx.closeStream();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index 2679d33..cd1e8ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -24,6 +24,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
@@ -34,6 +36,7 @@
import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
@@ -48,6 +51,7 @@
private final ColumnProjectorType operation;
private final ColumnRanges columnRanges;
private final IPhysicalDrive drive;
+ private final List<ICachedPage> pinnedPages;
private int numberOfContiguousPages;
private int pageCounter;
@@ -61,12 +65,15 @@
this.operation = operation;
this.columnRanges = columnRanges;
this.drive = drive;
+ pinnedPages = new ArrayList<>();
}
- public void prepare(int numberOfContiguousPages) throws HyracksDataException {
- close();
- this.numberOfContiguousPages = numberOfContiguousPages;
+ void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages)
+ throws HyracksDataException {
+ closeStream();
+ this.numberOfContiguousPages = numberOfPages;
pageCounter = 0;
+ doPin(bufferCache, fileId, pageZeroId, start, numberOfPages);
}
@Override
@@ -81,7 +88,6 @@
* for this particular page to avoid placing the bytes of this page into another page's position.
*/
skipStreamIfOpened(cachedPage);
- pageCounter++;
}
}
@@ -131,15 +137,18 @@
skipStreamIfOpened(cPage);
}
- if (++pageCounter == numberOfContiguousPages) {
- close();
- }
-
// Finally process the header
return DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
}
- void close() throws HyracksDataException {
+ void unpinAll(IBufferCache bufferCache) throws HyracksDataException {
+ for (int i = 0; i < pinnedPages.size(); i++) {
+ bufferCache.unpin(pinnedPages.get(i), this);
+ }
+ pinnedPages.clear();
+ }
+
+ void closeStream() throws HyracksDataException {
if (gapStream != null) {
if (remainingStreamBytes != 0) {
LOGGER.warn("Closed cloud stream with nonzero bytes = {}", remainingStreamBytes);
@@ -225,4 +234,20 @@
}
}
+ private void doPin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages)
+ throws HyracksDataException {
+ for (int i = start; i < start + numberOfPages; i++) {
+ long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
+ try {
+ pinnedPages.add(bufferCache.pin(dpid, this));
+ pageCounter++;
+ } catch (Throwable e) {
+ LOGGER.error(
+ "Error while pinning page number {} with number of pages {}. "
+ + "(streamOffset:{}, remainingStreamBytes: {}) columnRanges:\n {}",
+ i, numberOfPages, streamOffset, remainingStreamBytes, columnRanges);
+ throw e;
+ }
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
index d4b230a..1b4d09d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
@@ -90,7 +90,7 @@
return buf.getInt(NEXT_LEAF_OFFSET);
}
- public long getMegaLeafNodeLengthInBytes() {
+ public int getMegaLeafNodeLengthInBytes() {
return buf.getInt(MEGA_LEAF_NODE_LENGTH);
}