[ASTERIXDB-3389][STO] Support caching/eviciting columns
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Prepare columnar indexes to support caching and
evicting columns.
Change-Id: Ib557608b0b25219ffc00a76325091a92f5e77698
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18260
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index 43aa873..bbd8d87 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -93,6 +93,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-cloud</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
@@ -104,5 +109,10 @@
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
new file mode 100644
index 0000000..29258b5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.api.projection;
+
+public enum ColumnProjectorType {
+ MERGE,
+ QUERY,
+ MODIFY
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
index 641f704..b1bfe87 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
@@ -48,4 +48,9 @@
* @return number of filtered columns
*/
int getNumberOfFilteredColumns();
+
+ /**
+ * @return the type of {@link IColumnTupleProjector} that created this projection info
+ */
+ ColumnProjectorType getProjectorType();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
new file mode 100644
index 0000000..38a1713
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.sweeper.SweepContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.DefaultColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write.CloudColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+
+/**
+ * The disk manager cannot be shared among different partitions as columns are local to each partition.
+ * For example, column 9 in partition 0 corresponds to "salary" while column 9 in partition 1 corresponds to "age".
+ */
+public final class CloudColumnIndexDiskCacheManager implements IColumnIndexDiskCacheManager {
+ private final IColumnTupleProjector sweepProjector;
+ private final IPhysicalDrive drive;
+ private final ColumnSweepPlanner planner;
+ private final ColumnSweeper sweeper;
+
+ public CloudColumnIndexDiskCacheManager(int numberOfPrimaryKeys, IColumnTupleProjector sweepProjector,
+ IPhysicalDrive drive) {
+ this.sweepProjector = sweepProjector;
+ this.drive = drive;
+ planner = new ColumnSweepPlanner(numberOfPrimaryKeys, System::nanoTime);
+ sweeper = new ColumnSweeper(numberOfPrimaryKeys);
+ }
+
+ @Override
+ public void activate(int numberOfColumns, List<ILSMDiskComponent> diskComponents, IBufferCache bufferCache)
+ throws HyracksDataException {
+ planner.onActivate(numberOfColumns, diskComponents, sweepProjector, bufferCache);
+ }
+
+ @Override
+ public IColumnWriteContext createWriteContext(int numberOfColumns, LSMIOOperationType operationType) {
+ return new CloudColumnWriteContext(drive, planner, numberOfColumns);
+ }
+
+ @Override
+ public IColumnReadContext createReadContext(IColumnProjectionInfo projectionInfo) {
+ ColumnProjectorType projectorType = projectionInfo.getProjectorType();
+ if (projectorType == ColumnProjectorType.QUERY) {
+ planner.access(projectionInfo, drive.hasSpace());
+ } else if (projectorType == ColumnProjectorType.MODIFY) {
+ planner.setIndexedColumns(projectionInfo);
+ // Requested (and indexed) columns will be persisted if space permits
+ return DefaultColumnReadContext.INSTANCE;
+ }
+ return new CloudColumnReadContext(projectionInfo, drive, planner.getPlanCopy());
+ }
+
+ @Override
+ public boolean isActive() {
+ return planner.isActive();
+ }
+
+ @Override
+ public boolean isSweepable() {
+ return true;
+ }
+
+ @Override
+ public boolean prepareSweepPlan() {
+ return planner.plan();
+ }
+
+ @Override
+ public long sweep(ISweepContext context) throws HyracksDataException {
+ SweepContext sweepContext = (SweepContext) context;
+ return sweeper.sweep(planner.getPlanCopy(), sweepContext, sweepProjector);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
new file mode 100644
index 0000000..9fc60fa
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeperUtil.EMPTY;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnPageIndex;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.longs.LongArrays;
+import it.unimi.dsi.fastutil.longs.LongComparator;
+
+/**
+ * Computes columns offsets, lengths, and pages
+ */
+public final class ColumnRanges {
+ private static final LongComparator OFFSET_COMPARATOR =
+ (x, y) -> Integer.compare(getOffsetFromPair(x), getOffsetFromPair(y));
+ private final int numberOfPrimaryKeys;
+
+ // For eviction
+ private final BitSet nonEvictablePages;
+
+ // For Query
+ private final BitSet evictablePages;
+ private final BitSet cloudOnlyPages;
+
+ private ColumnBTreeReadLeafFrame leafFrame;
+ private long[] offsetColumnIndexPairs;
+ private int[] lengths;
+ private int[] columnsOrder;
+ private int pageZeroId;
+
+ public ColumnRanges(int numberOfPrimaryKeys) {
+ this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+
+ offsetColumnIndexPairs = new long[0];
+ lengths = new int[0];
+ columnsOrder = new int[0];
+
+ nonEvictablePages = new BitSet();
+
+ evictablePages = new BitSet();
+ cloudOnlyPages = new BitSet();
+ }
+
+ /**
+ * @return number of primary keys
+ */
+ public int getNumberOfPrimaryKeys() {
+ return numberOfPrimaryKeys;
+ }
+
+ /**
+ * Reset ranges for initializing {@link ColumnSweepPlanner}
+ *
+ * @param leafFrame to compute the ranges for
+ */
+ public void reset(ColumnBTreeReadLeafFrame leafFrame) {
+ reset(leafFrame, EMPTY, EMPTY, EMPTY);
+ }
+
+ /**
+ * Reset column ranges for {@link ColumnSweeper}
+ *
+ * @param leafFrame to compute the ranges for
+ * @param plan eviction plan
+ */
+ public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet plan) {
+ reset(leafFrame, plan, EMPTY, EMPTY);
+ }
+
+ /**
+ * Reset ranges for {@link CloudColumnReadContext}
+ *
+ * @param leafFrame to compute the ranges for
+ * @param requestedColumns required columns
+ * @param evictableColumns columns that are or will be evicted
+ * @param cloudOnlyColumns locked columns that cannot be read from a local disk
+ */
+ public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet requestedColumns, BitSet evictableColumns,
+ BitSet cloudOnlyColumns) {
+ // Set leafFrame
+ this.leafFrame = leafFrame;
+ // Ensure arrays capacities (given the leafFrame's columns and pages)
+ init();
+
+ // Get the number of columns in a page
+ int numberOfColumns = leafFrame.getNumberOfColumns();
+ for (int i = 0; i < numberOfColumns; i++) {
+ long offset = leafFrame.getColumnOffset(i);
+ // Set the first 32-bits to the offset and the second 32-bits to columnIndex
+ offsetColumnIndexPairs[i] = (offset << 32) + i;
+ }
+
+ // Set artificial offset to determine the last column's length
+ offsetColumnIndexPairs[numberOfColumns] = (leafFrame.getMegaLeafNodeLengthInBytes() << 32) + numberOfColumns;
+
+ // Sort the pairs by offset (i.e., lowest offset first)
+ LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfColumns, OFFSET_COMPARATOR);
+
+ int columnOrdinal = 0;
+ for (int i = 0; i < numberOfColumns; i++) {
+ int columnIndex = getColumnIndexFromPair(offsetColumnIndexPairs[i]);
+ int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
+ int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
+
+ // Compute the column's length in bytes (set 0 for PKs)
+ int length = columnIndex < numberOfPrimaryKeys ? 0 : nextOffset - offset;
+ lengths[columnIndex] = length;
+
+ // Get start page ID (given the computed length above)
+ int startPageId = getColumnStartPageIndex(columnIndex);
+ // Get the number of pages (given the computed length above)
+ int numberOfPages = getColumnNumberOfPages(columnIndex);
+
+ if (columnIndex >= numberOfPrimaryKeys && requestedColumns.get(columnIndex)) {
+ // Set column index
+ columnsOrder[columnOrdinal++] = columnIndex;
+ // Compute cloud-only and evictable pages
+ setCloudOnlyAndEvictablePages(columnIndex, cloudOnlyColumns, evictableColumns, startPageId,
+ numberOfPages);
+ // A requested column. Keep its pages as requested
+ continue;
+ }
+
+ // Mark the page as non-evictable
+ for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+ nonEvictablePages.set(j);
+ }
+ }
+
+ // Bound the nonRequestedPages to the number of pages in the mega leaf node
+ nonEvictablePages.set(leafFrame.getMegaLeafNodeNumberOfPages());
+ // to indicate the end
+ columnsOrder[columnOrdinal] = -1;
+ }
+
+ /**
+ * First page of a column
+ *
+ * @param columnIndex column index
+ * @return pageID
+ */
+ public int getColumnStartPageIndex(int columnIndex) {
+ int pageSize = leafFrame.getBuffer().capacity();
+ return getColumnPageIndex(leafFrame.getColumnOffset(columnIndex), pageSize);
+ }
+
+ /**
+ * Length of a column in pages
+ *
+ * @param columnIndex column index
+ * @return number of pages
+ */
+ public int getColumnNumberOfPages(int columnIndex) {
+ int pageSize = leafFrame.getBuffer().capacity();
+ int numberOfPages = getNumberOfPages(getColumnLength(columnIndex), pageSize);
+ return numberOfPages == 0 ? 1 : numberOfPages;
+ }
+
+ /**
+ * Length of a column in bytes
+ *
+ * @param columnIndex column index
+ * @return number of bytes
+ */
+ public int getColumnLength(int columnIndex) {
+ return lengths[columnIndex];
+ }
+
+ /**
+ * Returns true if the page is meant to be read from the cloud only
+ *
+ * @param pageId page ID
+ * @return true of the page should be read from the cloud, false otherwise
+ * @see #reset(ColumnBTreeReadLeafFrame, BitSet, BitSet, BitSet)
+ */
+ public boolean isCloudOnly(int pageId) {
+ // Compute the relative page ID for this mega leaf node
+ int relativePageId = pageId - pageZeroId;
+ return cloudOnlyPages.get(relativePageId);
+ }
+
+ /**
+ * Whether the page has been or will be evicted
+ *
+ * @param pageId page ID
+ * @return true of the page was or will be evicted, false otherwise
+ */
+ public boolean isEvictable(int pageId) {
+ int relativePageId = pageId - pageZeroId;
+ return evictablePages.get(relativePageId);
+ }
+
+ /**
+ * @return Bitset of all non-requested pages
+ */
+ public BitSet getNonEvictablePages() {
+ return nonEvictablePages;
+ }
+
+ /**
+ * @return you the order of columns that should be read in order to ensure (semi) sequential access.
+ * Sequential means page X is read before page Y, forall X and Y where X < Y
+ */
+ public int[] getColumnsOrder() {
+ return columnsOrder;
+ }
+
+ private void init() {
+ int numberOfColumns = leafFrame.getNumberOfColumns();
+ offsetColumnIndexPairs = LongArrays.ensureCapacity(offsetColumnIndexPairs, numberOfColumns + 1, 0);
+ lengths = IntArrays.ensureCapacity(lengths, numberOfColumns, 0);
+ columnsOrder = IntArrays.ensureCapacity(columnsOrder, numberOfColumns + 1, 0);
+ nonEvictablePages.clear();
+ evictablePages.clear();
+ cloudOnlyPages.clear();
+ pageZeroId = leafFrame.getPageId();
+ }
+
+ private static int getOffsetFromPair(long pair) {
+ return (int) (pair >> 32);
+ }
+
+ private static int getColumnIndexFromPair(long pair) {
+ return (int) pair;
+ }
+
+ private void setCloudOnlyAndEvictablePages(int columnIndex, BitSet cloudOnlyColumns, BitSet evictableColumns,
+ int startPageId, int numberOfPages) {
+ if (evictableColumns == EMPTY && cloudOnlyColumns == EMPTY) {
+ return;
+ }
+
+ // Find pages that meant to be read from the cloud only or are evictable
+ boolean cloudOnly = cloudOnlyColumns.get(columnIndex);
+ boolean evictable = evictableColumns.get(columnIndex);
+ if (cloudOnly || evictable) {
+ for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+ if (cloudOnly) {
+ cloudOnlyPages.set(j);
+ } else {
+ evictablePages.set(j);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ int numberOfPages = leafFrame.getMegaLeafNodeNumberOfPages();
+ builder.append(" ");
+ for (int i = 0; i < numberOfPages; i++) {
+ builder.append(String.format("%02d", i));
+ builder.append(" ");
+ }
+
+ builder.append('\n');
+ for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
+ builder.append(String.format("%03d", i));
+ builder.append(":");
+ int startPageId = getColumnStartPageIndex(i);
+ int columnPagesCount = getColumnNumberOfPages(i);
+ printColumnPages(builder, numberOfPages, startPageId, columnPagesCount);
+ }
+
+ builder.append("nonEvictablePages: ");
+ builder.append(nonEvictablePages);
+ builder.append('\n');
+ builder.append("evictablePages: ");
+ builder.append(evictablePages);
+ builder.append('\n');
+ builder.append("cloudOnlyPages: ");
+ builder.append(cloudOnlyPages);
+
+ return builder.toString();
+ }
+
+ private void printColumnPages(StringBuilder builder, int numberOfPages, int startPageId, int columnPagesCount) {
+ for (int i = 0; i < numberOfPages; i++) {
+ builder.append(" ");
+ if (i >= startPageId && i < startPageId + columnPagesCount) {
+ builder.append(1);
+ } else {
+ builder.append(0);
+ }
+ }
+ builder.append('\n');
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
new file mode 100644
index 0000000..af69fe5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import static org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.buffercache.page.ISweepLockInfo;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepLockInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+@NotThreadSafe
+public final class CloudColumnReadContext implements IColumnReadContext {
+ private final ColumnProjectorType operation;
+ private final IPhysicalDrive drive;
+ private final BitSet plan;
+ private final BitSet cloudOnlyColumns;
+ private final ColumnRanges columnRanges;
+ private final CloudMegaPageReadContext columnCtx;
+ private final List<ICachedPage> pinnedPages;
+ private final BitSet projectedColumns;
+
+ public CloudColumnReadContext(IColumnProjectionInfo projectionInfo, IPhysicalDrive drive, BitSet plan) {
+ this.operation = projectionInfo.getProjectorType();
+ this.drive = drive;
+ this.plan = plan;
+ columnRanges = new ColumnRanges(projectionInfo.getNumberOfPrimaryKeys());
+ cloudOnlyColumns = new BitSet();
+ columnCtx = new CloudMegaPageReadContext(operation, columnRanges, drive);
+ pinnedPages = new ArrayList<>();
+ projectedColumns = new BitSet();
+ if (operation == QUERY || operation == MODIFY) {
+ for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) {
+ int columnIndex = projectionInfo.getColumnIndex(i);
+ if (columnIndex >= 0) {
+ projectedColumns.set(columnIndex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onPin(ICachedPage page) {
+ CloudCachedPage cloudPage = (CloudCachedPage) page;
+ ISweepLockInfo lockTest = cloudPage.beforeRead();
+ if (lockTest.isLocked()) {
+ ColumnSweepLockInfo lockedColumns = (ColumnSweepLockInfo) lockTest;
+ lockedColumns.getLockedColumns(cloudOnlyColumns);
+ }
+ }
+
+ @Override
+ public void onUnpin(ICachedPage page) {
+ CloudCachedPage cloudPage = (CloudCachedPage) page;
+ cloudPage.afterRead();
+ }
+
+ @Override
+ public boolean isNewPage() {
+ return false;
+ }
+
+ @Override
+ public boolean incrementStats() {
+ return true;
+ }
+
+ @Override
+ public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+ CachedPage cPage) throws HyracksDataException {
+ // Page zero will be persisted (always) if free space permits
+ return readAndPersistPage(ioManager, fileHandle, header, cPage, drive.hasSpace());
+ }
+
+ @Override
+ public ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+ throws HyracksDataException {
+ // TODO do we support prefetching?
+ ICachedPage nextPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrame.getNextLeaf()), this);
+ release(bufferCache);
+ bufferCache.unpin(leafFrame.getPage(), this);
+ leafFrame.setPage(nextPage);
+ return nextPage;
+ }
+
+ @Override
+ public void prepareColumns(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+ throws HyracksDataException {
+ if (leafFrame.getTupleCount() == 0) {
+ return;
+ }
+
+ // TODO handle prefetch if supported
+
+ columnRanges.reset(leafFrame, projectedColumns, plan, cloudOnlyColumns);
+ int pageZeroId = leafFrame.getPageId();
+ int[] columnsOrders = columnRanges.getColumnsOrder();
+ int i = 0;
+ int columnIndex = columnsOrders[i];
+ while (columnIndex > -1) {
+ if (columnIndex < columnRanges.getNumberOfPrimaryKeys()) {
+ columnIndex = columnsOrders[++i];
+ continue;
+ }
+
+ int startPageId = columnRanges.getColumnStartPageIndex(columnIndex);
+ // Will increment the number pages if the next column's pages are contiguous to this column's pages
+ int numberOfPages = columnRanges.getColumnNumberOfPages(columnIndex);
+
+ // Advance to the next column to check if it has contiguous pages
+ columnIndex = columnsOrders[++i];
+ while (columnIndex > -1) {
+ // Get the next column's start page ID
+ int nextStartPageId = columnRanges.getColumnStartPageIndex(columnIndex);
+ if (nextStartPageId > startPageId + numberOfPages + 1) {
+ // The next startPageId is not contiguous, stop.
+ break;
+ }
+
+ // Last page of this column
+ int nextLastPage = nextStartPageId + columnRanges.getColumnNumberOfPages(columnIndex);
+ // The next column's pages are contiguous. Combine its ranges with the previous one.
+ numberOfPages = nextLastPage - startPageId;
+ // Advance to the next column
+ columnIndex = columnsOrders[++i];
+ }
+
+ columnCtx.prepare(numberOfPages);
+ pin(bufferCache, fileId, pageZeroId, startPageId, numberOfPages);
+ }
+ }
+
+ private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numOfRequestedPages)
+ throws HyracksDataException {
+ for (int i = start; i < start + numOfRequestedPages; i++) {
+ long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
+ pinnedPages.add(bufferCache.pin(dpid, columnCtx));
+ }
+ }
+
+ @Override
+ public void release(IBufferCache bufferCache) throws HyracksDataException {
+ // Release might differ in the future if prefetching is supported
+ close(bufferCache);
+ }
+
+ @Override
+ public void close(IBufferCache bufferCache) throws HyracksDataException {
+ release(pinnedPages, bufferCache, columnCtx);
+ columnCtx.close();
+ }
+
+ private static void release(List<ICachedPage> pinnedPages, IBufferCache bufferCache, IBufferCacheReadContext ctx)
+ throws HyracksDataException {
+ for (int i = 0; i < pinnedPages.size(); i++) {
+ bufferCache.unpin(pinnedPages.get(i), ctx);
+ }
+ pinnedPages.clear();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
new file mode 100644
index 0000000..0f0b0b9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
+import static org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
+import static org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@NotThreadSafe
+final class CloudMegaPageReadContext implements IBufferCacheReadContext {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ColumnProjectorType operation;
+ private final ColumnRanges columnRanges;
+ private final IPhysicalDrive drive;
+ private int numberOfContiguousPages;
+ private int pageCounter;
+ private InputStream gapStream;
+
+ CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges columnRanges, IPhysicalDrive drive) {
+ this.operation = operation;
+ this.columnRanges = columnRanges;
+ this.drive = drive;
+ }
+
+ public void prepare(int numberOfContiguousPages) throws HyracksDataException {
+ close();
+ this.numberOfContiguousPages = numberOfContiguousPages;
+ pageCounter = 0;
+ }
+
+ @Override
+ public void onPin(ICachedPage page) throws HyracksDataException {
+ CloudCachedPage cachedPage = (CloudCachedPage) page;
+ if (gapStream != null && cachedPage.skipCloudStream()) {
+ /*
+ * This page is requested but the buffer cache has a valid copy in memory. Also, the page itself was
+ * requested to be read from the cloud. Since this page is valid, no buffer cache read() will be performed.
+ * As the buffer cache read() is also responsible for persisting the bytes read from the cloud, we can end
+ * up writing the bytes of this page in the position of another page. Therefore, we should skip the bytes
+ * for this particular page to avoid placing the bytes of this page into another page's position.
+ */
+ try {
+ long remaining = cachedPage.getCompressedPageSize();
+ while (remaining > 0) {
+ remaining -= gapStream.skip(remaining);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+
+ @Override
+ public void onUnpin(ICachedPage page) {
+ // NoOp
+ }
+
+ @Override
+ public boolean isNewPage() {
+ return false;
+ }
+
+ @Override
+ public boolean incrementStats() {
+ return true;
+ }
+
+ @Override
+ public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+ CachedPage cPage) throws HyracksDataException {
+ boolean empty = BufferCacheCloudReadContextUtil.isEmpty(header);
+ int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
+ boolean cloudOnly = columnRanges.isCloudOnly(pageId);
+ ByteBuffer buffer;
+ if (empty || cloudOnly || gapStream != null) {
+ boolean evictable = columnRanges.isEvictable(pageId);
+ /*
+ * Persist iff the following conditions are satisfied:
+ * - The page is empty
+ * - The page is not being evicted (cloudOnly)
+ * - The page is not planned for eviction (evictable)
+ * - The operation is not a merge operation (the component will be deleted anyway)
+ * - The disk has space
+ *
+ * Note: 'emtpy' can be false while 'cloudOnly is true'. We cannot read from disk as the page can be
+ * evicted at any moment. In other words, the sweeper told us that it is going to evict this page; hence
+ * 'cloudOnly' is true.
+ */
+ boolean persist = empty && !cloudOnly && !evictable && operation != MERGE && drive.hasSpace();
+ buffer = readFromStream(ioManager, fileHandle, header, cPage, persist);
+ buffer.position(RESERVED_HEADER_BYTES);
+ } else {
+ /*
+ * Here we can find a page that is planned for eviction, but it has not being evicted yet
+ * (i.e., empty = false). This could happen if the cursor is at a point the sweeper hasn't
+ * reached yet (i.e., cloudOnly = false).
+ */
+ buffer = DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
+ }
+
+ if (++pageCounter == numberOfContiguousPages) {
+ close();
+ }
+
+ return buffer;
+ }
+
+ void close() throws HyracksDataException {
+ if (gapStream != null) {
+ try {
+ gapStream.close();
+ gapStream = null;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+
+ private ByteBuffer readFromStream(IOManager ioManager, BufferedFileHandle fileHandle,
+ BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException {
+ InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
+ ByteBuffer buffer = header.getBuffer();
+ buffer.position(0);
+ try {
+ while (buffer.remaining() != 0) {
+ int length = stream.read(buffer.array(), buffer.position(), buffer.remaining());
+ if (length < 0) {
+ throw new IllegalStateException("Stream should not be empty!");
+ }
+ buffer.position(buffer.position() + length);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+
+ buffer.flip();
+
+ if (persist) {
+ long offset = cPage.getCompressedPageOffset();
+ ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+ BufferCacheCloudReadContextUtil.persist(cloudIOManager, fileHandle.getFileHandle(), buffer, offset);
+ }
+
+ return buffer;
+ }
+
+ private InputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage)
+ throws HyracksDataException {
+ if (gapStream != null) {
+ return gapStream;
+ }
+
+ LOGGER.info("Cloud stream read for {} pages", numberOfContiguousPages - pageCounter);
+ int requiredNumOfPages = numberOfContiguousPages - pageCounter;
+ long offset = cPage.getCompressedPageOffset();
+ int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
+ long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
+
+ ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+ gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), offset, length);
+
+ return gapStream;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
new file mode 100644
index 0000000..dcde833
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.buffercache.context.DefaultCloudOnlyWriteContext;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
+import org.apache.hyracks.storage.common.buffercache.context.write.DefaultBufferCacheWriteContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
+public final class CloudColumnWriteContext implements IColumnWriteContext {
+ private static final int INITIAL_NUMBER_OF_COLUMNS = 32;
+ private final IPhysicalDrive drive;
+ private final ColumnSweepPlanner planner;
+ private final BitSet plan;
+ private final IntSet indexedColumns;
+ private int[] sizes;
+ private int numberOfColumns;
+ private IBufferCacheWriteContext currentContext;
+ /**
+ * writeAndSwap = true means the next call to write() will persist the page locally and swap to cloud-only
+ * writer (i.e., the following pages will be written in the cloud but not locally).
+ * <p>
+ * 'writeAndSwap' is set to 'true' iff the previous column's last page should be persisted AND it is
+ * 'overlapping' with this column's first page.
+ */
+ private boolean writeLocallyAndSwitchToCloudOnly;
+
+ public CloudColumnWriteContext(IPhysicalDrive drive, ColumnSweepPlanner planner, int numberOfColumns) {
+ this.drive = drive;
+ this.planner = planner;
+ this.plan = planner.getPlanCopy();
+ this.indexedColumns = planner.getIndexedColumnsCopy();
+ int initialLength =
+ planner.getNumberOfPrimaryKeys() == numberOfColumns ? INITIAL_NUMBER_OF_COLUMNS : numberOfColumns;
+ sizes = new int[initialLength];
+ // Number of columns is not known during the flush operation
+ this.numberOfColumns = 0;
+ currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+ }
+
+ @Override
+ public void startWritingColumn(int columnIndex, boolean overlapping) {
+ if (drive.hasSpace() || indexedColumns.contains(columnIndex)) {
+ // The current column should be persisted locally if free disk space permits
+ currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+ } else if (plan.get(columnIndex)) {
+ // This column was planned for eviction, do not persist.
+ if (overlapping && currentContext == DefaultBufferCacheWriteContext.INSTANCE) {
+ // The previous column's last page should be persisted AND it is overlapping with the current column's
+ // first page. Persist the first page locally and switch to cloud-only writer.
+ writeLocallyAndSwitchToCloudOnly = true;
+ } else {
+ // The previous column's last page is not overlapping. Switch to cloud-only writer (if not already)
+ currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+ }
+ } else {
+ // Local drive is pressured. Write to cloud only.
+ currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+ }
+ }
+
+ @Override
+ public void endWritingColumn(int columnIndex, int size) {
+ ensureCapacity(columnIndex);
+ sizes[columnIndex] = Math.max(sizes[columnIndex], size);
+ }
+
+ @Override
+ public void columnsPersisted() {
+ // Set the default writer context to persist pageZero and the interior nodes' pages locally
+ currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+ writeLocallyAndSwitchToCloudOnly = false;
+ }
+
+ @Override
+ public void close() {
+ // Report the sizes of the written columns
+ planner.adjustColumnSizes(sizes, numberOfColumns);
+ }
+
+ /*
+ * ************************************************************************************************
+ * WRITE methods
+ * ************************************************************************************************
+ */
+
+ @Override
+ public int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data)
+ throws HyracksDataException {
+ int writtenBytes = currentContext.write(ioManager, handle, offset, data);
+ switchIfNeeded();
+ return writtenBytes;
+ }
+
+ @Override
+ public long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data)
+ throws HyracksDataException {
+ long writtenBytes = currentContext.write(ioManager, handle, offset, data);
+ switchIfNeeded();
+ return writtenBytes;
+ }
+
+ /*
+ * ************************************************************************************************
+ * helper methods
+ * ************************************************************************************************
+ */
+
+ private void ensureCapacity(int columnIndex) {
+ int length = sizes.length;
+ if (columnIndex >= length) {
+ sizes = IntArrays.grow(sizes, columnIndex + 1);
+ }
+
+ numberOfColumns = Math.max(numberOfColumns, columnIndex + 1);
+ }
+
+ private void switchIfNeeded() {
+ if (writeLocallyAndSwitchToCloudOnly) {
+ // Switch to cloud-only writer
+ currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+ writeLocallyAndSwitchToCloudOnly = false;
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
new file mode 100644
index 0000000..ed83e84
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.cloud.buffercache.page.ISweepLockInfo;
+
+public final class ColumnSweepLockInfo implements ISweepLockInfo {
+ private final BitSet lockedColumns;
+
+ public ColumnSweepLockInfo() {
+ lockedColumns = new BitSet();
+ }
+
+ /**
+ * Reset the lock with plan's columns
+ *
+ * @param plan contains the columns to be locked
+ */
+ void reset(BitSet plan) {
+ lockedColumns.clear();
+ lockedColumns.or(plan);
+ }
+
+ /**
+ * Clear and set the locked columns in the provided {@link BitSet}
+ *
+ * @param lockedColumns used to get the locked columns
+ */
+ public void getLockedColumns(BitSet lockedColumns) {
+ lockedColumns.clear();
+ lockedColumns.or(this.lockedColumns);
+ }
+
+ @Override
+ public boolean isLocked() {
+ return true;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
new file mode 100644
index 0000000..9549f4f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongArrays;
+
+public final class ColumnSweepPlanner {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final double SIZE_WEIGHT = 0.3d;
+ private static final double LAST_ACCESS_WEIGHT = 1.0d - SIZE_WEIGHT;
+ private static final double INITIAL_PUNCHABLE_THRESHOLD = 0.7d;
+ private static final double PUNCHABLE_THRESHOLD_DECREMENT = 0.7d;
+ private static final int MAX_ITERATION_COUNT = 5;
+ private static final int REEVALUATE_PLAN_THRESHOLD = 50;
+
+ private final AtomicBoolean active;
+ private final int numberOfPrimaryKeys;
+ private final BitSet plan;
+ private final BitSet reevaluatedPlan;
+ private final IntSet indexedColumns;
+ private final ISweepClock clock;
+ private int numberOfColumns;
+ private long lastAccess;
+ private int maxSize;
+ private int[] sizes;
+ private long[] lastAccesses;
+
+ private double punchableThreshold;
+ private long lastSweepTs;
+ private int numberOfSweptColumns;
+ private int numberOfCloudRequests;
+
+ public ColumnSweepPlanner(int numberOfPrimaryKeys, ISweepClock clock) {
+ this.clock = clock;
+ active = new AtomicBoolean(false);
+ this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+ sizes = new int[0];
+ lastAccesses = new long[0];
+ indexedColumns = new IntOpenHashSet();
+ plan = new BitSet();
+ reevaluatedPlan = new BitSet();
+ punchableThreshold = INITIAL_PUNCHABLE_THRESHOLD;
+ }
+
+ public boolean isActive() {
+ return active.get();
+ }
+
+ public int getNumberOfPrimaryKeys() {
+ return numberOfPrimaryKeys;
+ }
+
+ public void onActivate(int numberOfColumns, List<ILSMDiskComponent> diskComponents,
+ IColumnTupleProjector sweepProjector, IBufferCache bufferCache) throws HyracksDataException {
+ resizeStatsArrays(numberOfColumns);
+ setInitialSizes(diskComponents, sweepProjector, bufferCache);
+ active.set(true);
+ }
+
+ public void setIndexedColumns(IColumnProjectionInfo projectionInfo) {
+ indexedColumns.clear();
+ for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) {
+ int columnIndex = projectionInfo.getColumnIndex(i);
+ indexedColumns.add(columnIndex);
+ }
+ }
+
+ public IntSet getIndexedColumnsCopy() {
+ return new IntOpenHashSet(indexedColumns);
+ }
+
+ public synchronized void access(IColumnProjectionInfo projectionInfo, boolean hasSpace) {
+ resetPlanIfNeeded(hasSpace);
+ long accessTime = clock.getCurrentTime();
+ lastAccess = accessTime;
+ int numberOfColumns = projectionInfo.getNumberOfProjectedColumns();
+ boolean requireCloudAccess = false;
+ for (int i = 0; i < numberOfColumns; i++) {
+ int columnIndex = projectionInfo.getColumnIndex(i);
+ if (columnIndex >= 0) {
+ // columnIndex can be -1 when accessing a non-existing column (i.e., not known by the schema)
+ lastAccesses[columnIndex] = accessTime;
+ requireCloudAccess |= numberOfSweptColumns > 0 && plan.get(columnIndex);
+ }
+ }
+
+ numberOfCloudRequests += requireCloudAccess ? 1 : 0;
+ }
+
+ public synchronized void adjustColumnSizes(int[] newSizes, int numberOfColumns) {
+ resizeStatsArrays(numberOfColumns);
+ for (int i = 0; i < numberOfColumns; i++) {
+ int newSize = newSizes[i];
+ sizes[i] = Math.max(sizes[i], newSize);
+ maxSize = Math.max(maxSize, newSize);
+ }
+ }
+
+ public synchronized boolean plan() {
+ plan.clear();
+ int numberOfEvictableColumns = 0;
+ int iter = 0;
+ // Calculate weights: Ensure the plan contains new columns that never been swept
+ while (iter < MAX_ITERATION_COUNT && numberOfEvictableColumns < numberOfColumns) {
+ if (numberOfEvictableColumns > 0) {
+ // Do not reiterate if we found columns to evict
+ break;
+ }
+
+ // Find evictable columns
+ numberOfEvictableColumns += findEvictableColumns(plan);
+
+ // The next iteration/plan will be more aggressive
+ punchableThreshold *= PUNCHABLE_THRESHOLD_DECREMENT;
+ iter++;
+ }
+ // Register the plan time
+ lastSweepTs = clock.getCurrentTime();
+ // Add the number of evictable columns
+ numberOfSweptColumns += numberOfEvictableColumns;
+ if (numberOfEvictableColumns > 0) {
+ LOGGER.info("Planning to evict {} columns. The evictable columns are {}", numberOfEvictableColumns, plan);
+ return true;
+ }
+
+ LOGGER.info("Couldn't find columns to evict after {} iteration", iter);
+ return false;
+ }
+
+ public synchronized BitSet getPlanCopy() {
+ return (BitSet) plan.clone();
+ }
+
+ private double getWeight(int i, IntSet indexedColumns, int numberOfPrimaryKeys) {
+ if (i < numberOfPrimaryKeys || indexedColumns.contains(i)) {
+ return -1.0;
+ }
+
+ double sizeWeight = sizes[i] / (double) maxSize * SIZE_WEIGHT;
+ double lasAccessWeight = (lastAccess - lastAccesses[i]) / (double) lastAccess * LAST_ACCESS_WEIGHT;
+
+ return sizeWeight + lasAccessWeight;
+ }
+
+ private void resizeStatsArrays(int numberOfColumns) {
+ sizes = IntArrays.ensureCapacity(sizes, numberOfColumns);
+ lastAccesses = LongArrays.ensureCapacity(lastAccesses, numberOfColumns);
+ this.numberOfColumns = numberOfColumns - numberOfPrimaryKeys;
+ }
+
+ private void setInitialSizes(List<ILSMDiskComponent> diskComponents, IColumnTupleProjector sweepProjector,
+ IBufferCache bufferCache) throws HyracksDataException {
+ // This runs when activating an index (no need to synchronize on the opTracker)
+ if (diskComponents.isEmpty()) {
+ return;
+ }
+
+ IColumnProjectionInfo columnProjectionInfo =
+ ColumnSweeperUtil.createColumnProjectionInfo(diskComponents, sweepProjector);
+ ILSMDiskComponent latestComponent = diskComponents.get(0);
+ ILSMDiskComponent oldestComponent = diskComponents.get(diskComponents.size() - 1);
+ ColumnBTreeReadLeafFrame leafFrame = ColumnSweeperUtil.createLeafFrame(columnProjectionInfo, latestComponent);
+ ColumnRanges ranges = new ColumnRanges(columnProjectionInfo.getNumberOfPrimaryKeys());
+ // Get the column sizes from the freshest component, which has the columns of the most recent schema
+ setColumnSizes(latestComponent, leafFrame, ranges, bufferCache);
+ if (isMergedComponent(oldestComponent.getId())) {
+ // Get the column sizes from the oldest merged component, which probably has the largest columns
+ setColumnSizes(oldestComponent, leafFrame, ranges, bufferCache);
+ }
+ }
+
+ private void setColumnSizes(ILSMDiskComponent diskComponent, ColumnBTreeReadLeafFrame leafFrame,
+ ColumnRanges ranges, IBufferCache bufferCache) throws HyracksDataException {
+ ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+ long dpid = BufferedFileHandle.getDiskPageId(columnBTree.getFileId(), columnBTree.getBulkloadLeafStart());
+ ICachedPage page = bufferCache.pin(dpid);
+ try {
+ leafFrame.setPage(page);
+ ranges.reset(leafFrame);
+ for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
+ sizes[i] = Math.max(sizes[i], ranges.getColumnLength(i));
+ maxSize = Math.max(maxSize, sizes[i]);
+ }
+ } finally {
+ bufferCache.unpin(page);
+ }
+ }
+
+ private int findEvictableColumns(BitSet plan) {
+ int numberOfEvictableColumns = 0;
+ for (int i = 0; i < sizes.length; i++) {
+ if (!plan.get(i) && getWeight(i, indexedColumns, numberOfPrimaryKeys) >= punchableThreshold) {
+ // Column reached a punchable threshold; include it in the eviction plan.
+ plan.set(i);
+ numberOfEvictableColumns++;
+ }
+ }
+
+ return numberOfEvictableColumns;
+ }
+
+ private void resetPlanIfNeeded(boolean hasSpace) {
+ if (!hasSpace || numberOfCloudRequests < REEVALUATE_PLAN_THRESHOLD) {
+ return;
+ }
+
+ numberOfCloudRequests = 0;
+ reevaluatedPlan.clear();
+ int numberOfEvictableColumns = findEvictableColumns(reevaluatedPlan);
+ for (int i = 0; i < numberOfEvictableColumns; i++) {
+ int columnIndex = reevaluatedPlan.nextSetBit(i);
+ if (!plan.get(columnIndex)) {
+ // the plan contains a stale column. Invalidate!
+ plan.clear();
+ plan.or(reevaluatedPlan);
+ LOGGER.info("Re-planning to evict {} columns. The newly evictable columns are {}",
+ numberOfEvictableColumns, plan);
+ break;
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
new file mode 100644
index 0000000..4932bf0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState.READABLE_UNWRITABLE;
+import static org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.sweeper.SweepContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.CriticalPath;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class ColumnSweeper {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ColumnSweepLockInfo lockedColumns;
+ private final ColumnRanges ranges;
+ private final List<ILSMDiskComponent> sweepableComponents;
+
+ public ColumnSweeper(int numberOfPrimaryKeys) {
+ lockedColumns = new ColumnSweepLockInfo();
+ ranges = new ColumnRanges(numberOfPrimaryKeys);
+ sweepableComponents = new ArrayList<>();
+ }
+
+ public long sweep(BitSet plan, SweepContext context, IColumnTupleProjector sweepProjector)
+ throws HyracksDataException {
+ IndexUnit indexUnit = context.getIndexUnit();
+ LSMColumnBTree lsmColumnBTree = (LSMColumnBTree) indexUnit.getIndex();
+ IColumnProjectionInfo projectionInfo = captureSweepableComponents(lsmColumnBTree, sweepProjector);
+ if (projectionInfo == null) {
+ // no sweepable components
+ return 0L;
+ }
+
+ LOGGER.info("Sweeping {}", lsmColumnBTree);
+ ILSMDiskComponent latestComponent = sweepableComponents.get(0);
+ ColumnBTreeReadLeafFrame leafFrame = ColumnSweeperUtil.createLeafFrame(projectionInfo, latestComponent);
+ IBufferCacheReadContext bcOpCtx = SweepBufferCacheReadContext.INSTANCE;
+ lockedColumns.reset(plan);
+ long freedSpace = 0L;
+ for (int i = 0; i < sweepableComponents.size(); i++) {
+ if (context.stopSweeping()) {
+ // Exit as the index is being dropped
+ return 0L;
+ }
+
+ boolean failed = false;
+ // Components are entered one at a time to allow components to be merged and deactivated
+ ILSMDiskComponent diskComponent = enterAndGetComponent(i, lsmColumnBTree);
+ // If diskComponent is null, that means it is not a viable candidate anymore
+ if (diskComponent != null) {
+ try {
+ freedSpace += sweepDiskComponent(leafFrame, diskComponent, plan, context, bcOpCtx);
+ } catch (Throwable e) {
+ failed = true;
+ throw e;
+ } finally {
+ exitComponent(diskComponent, lsmColumnBTree, failed);
+ }
+ }
+ }
+
+ LOGGER.info("Swept {} components and freed {} from disk", sweepableComponents.size(),
+ StorageUtil.toHumanReadableSize(freedSpace));
+ return freedSpace;
+ }
+
+ @CriticalPath
+ private IColumnProjectionInfo captureSweepableComponents(LSMColumnBTree lsmColumnBTree,
+ IColumnTupleProjector sweepProjector) throws HyracksDataException {
+ ILSMOperationTracker opTracker = lsmColumnBTree.getOperationTracker();
+ sweepableComponents.clear();
+ synchronized (opTracker) {
+ List<ILSMDiskComponent> diskComponents = lsmColumnBTree.getDiskComponents();
+ for (int i = 0; i < diskComponents.size(); i++) {
+ ILSMDiskComponent diskComponent = diskComponents.get(i);
+ /*
+ * Get components that are only in READABLE_UNWRITABLE state. Components that are currently being
+ * merged should not be swept as they will be deleted anyway.
+ * Also, only sweep merged components as flushed components are relatively smaller and should be
+ * merged eventually. So, it is preferable to read everything locally when merging flushed components.
+ * TODO should we sweep flushed components?
+ */
+ if (isMergedComponent(diskComponent.getId()) && diskComponent.getState() == READABLE_UNWRITABLE
+ && diskComponent.getComponentSize() > 0) {
+ // The component is a good candidate to be swept
+ sweepableComponents.add(diskComponent);
+ }
+ }
+
+ if (sweepableComponents.isEmpty()) {
+ // No sweepable components
+ return null;
+ }
+
+ return ColumnSweeperUtil.createColumnProjectionInfo(sweepableComponents, sweepProjector);
+ }
+ }
+
+ private ILSMDiskComponent enterAndGetComponent(int index, LSMColumnBTree lsmColumnBTree)
+ throws HyracksDataException {
+ ILSMDiskComponent diskComponent = sweepableComponents.get(index);
+ synchronized (lsmColumnBTree.getOperationTracker()) {
+ // Make sure the component is still in READABLE_UNWRITABLE state
+ if (diskComponent.getState() == READABLE_UNWRITABLE
+ && diskComponent.threadEnter(LSMOperationType.DISK_COMPONENT_SCAN, false)) {
+ return diskComponent;
+ }
+ }
+
+ // the component is not a viable candidate anymore
+ return null;
+ }
+
+ private void exitComponent(ILSMDiskComponent diskComponent, LSMColumnBTree lsmColumnBTree, boolean failed)
+ throws HyracksDataException {
+ synchronized (lsmColumnBTree.getOperationTracker()) {
+ diskComponent.threadExit(LSMOperationType.DISK_COMPONENT_SCAN, failed, false);
+ }
+ }
+
+ private long sweepDiskComponent(ColumnBTreeReadLeafFrame leafFrame, ILSMDiskComponent diskComponent, BitSet plan,
+ SweepContext context, IBufferCacheReadContext bcOpCtx) throws HyracksDataException {
+ long dpid = getFirstPageId(diskComponent);
+ int fileId = BufferedFileHandle.getFileId(dpid);
+ int nextPageId = BufferedFileHandle.getPageId(dpid);
+ int freedSpace = 0;
+ context.open(fileId);
+ try {
+ while (nextPageId >= 0) {
+ if (context.stopSweeping()) {
+ // Exit as the index is being dropped
+ return 0L;
+ }
+ CloudCachedPage page0 = context.pin(BufferedFileHandle.getDiskPageId(fileId, nextPageId), bcOpCtx);
+ boolean columnsLocked = false;
+ try {
+ leafFrame.setPage(page0);
+ nextPageId = leafFrame.getNextLeaf();
+ columnsLocked = page0.trySweepLock(lockedColumns);
+ if (columnsLocked) {
+ leafFrame.setPage(page0);
+ ranges.reset(leafFrame, plan);
+ freedSpace += punchHoles(context, leafFrame);
+ }
+ } finally {
+ if (columnsLocked) {
+ page0.sweepUnlock();
+ }
+ context.unpin(page0, bcOpCtx);
+ }
+ }
+ } finally {
+ context.close();
+ }
+
+ return freedSpace;
+ }
+
+ private long getFirstPageId(ILSMDiskComponent diskComponent) {
+ ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+ int fileId = columnBTree.getFileId();
+ int firstPage = columnBTree.getBulkloadLeafStart();
+ return BufferedFileHandle.getDiskPageId(fileId, firstPage);
+ }
+
+ private int punchHoles(SweepContext context, ColumnBTreeReadLeafFrame leafFrame) throws HyracksDataException {
+ int freedSpace = 0;
+ int numberOfPages = leafFrame.getMegaLeafNodeNumberOfPages();
+ int pageZeroId = leafFrame.getPageId();
+
+ // Start from 1 as we do not evict pageZero
+ BitSet nonEvictablePages = ranges.getNonEvictablePages();
+ int start = nonEvictablePages.nextClearBit(1);
+ while (start < numberOfPages) {
+ int end = nonEvictablePages.nextSetBit(start);
+ int numberOfEvictablePages = end - start;
+ freedSpace += context.punchHole(pageZeroId + start, numberOfEvictablePages);
+
+ start = nonEvictablePages.nextClearBit(end);
+ }
+
+ return freedSpace;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
new file mode 100644
index 0000000..e51f1d7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeLeafFrameFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+
+public class ColumnSweeperUtil {
+ private ColumnSweeperUtil() {
+ }
+
+ public static final BitSet EMPTY = new BitSet();
+
+ static IColumnProjectionInfo createColumnProjectionInfo(List<ILSMDiskComponent> diskComponents,
+ IColumnTupleProjector projector) throws HyracksDataException {
+ // Get the newest disk component, which has the newest column metadata
+ ILSMDiskComponent newestComponent = diskComponents.get(0);
+ IValueReference columnMetadata = ColumnUtil.getColumnMetadataCopy(newestComponent.getMetadata());
+
+ return projector.createProjectionInfo(columnMetadata);
+ }
+
+ static ColumnBTreeReadLeafFrame createLeafFrame(IColumnProjectionInfo projectionInfo,
+ ILSMDiskComponent diskComponent) {
+ ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+ ColumnBTreeLeafFrameFactory leafFrameFactory = (ColumnBTreeLeafFrameFactory) columnBTree.getLeafFrameFactory();
+ return leafFrameFactory.createReadFrame(projectionInfo);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
new file mode 100644
index 0000000..f36a51d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+@FunctionalInterface
+public interface ISweepClock {
+ long getCurrentTime();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
new file mode 100644
index 0000000..7b51d55
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+final class SweepBufferCacheReadContext implements IBufferCacheReadContext {
+ static final IBufferCacheReadContext INSTANCE = new SweepBufferCacheReadContext();
+
+ private SweepBufferCacheReadContext() {
+ }
+
+ @Override
+ public void onPin(ICachedPage page) {
+ // NoOp
+ }
+
+ @Override
+ public void onUnpin(ICachedPage page) {
+ // NoOp
+ }
+
+ @Override
+ public boolean isNewPage() {
+ return false;
+ }
+
+ @Override
+ public boolean incrementStats() {
+ // Do not increment the stats for the sweeper
+ return false;
+ }
+
+ @Override
+ public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+ CachedPage cPage) throws HyracksDataException {
+ // Will not persist as the disk is pressured
+ return readAndPersistPage(ioManager, fileHandle, header, cPage, false);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
new file mode 100644
index 0000000..6bace98
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+import static org.apache.hyracks.util.StorageUtil.getIntSizeInBytes;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Random;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.dummy.DummyColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.dummy.DummySweepClock;
+import org.apache.hyracks.util.StorageUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
+public class ColumnSweepPlannerTest {
+ private static final int MAX_MEGA_LEAF_NODE_SIZE = getIntSizeInBytes(10, StorageUtil.StorageUnit.MEGABYTE);
+ private static final Random RANDOM = new Random(0);
+ private final DummySweepClock clock = new DummySweepClock();
+
+ @Test
+ public void test10Columns() {
+ int numberOfPrimaryKeys = 1;
+ int numberOfColumns = numberOfPrimaryKeys + 10;
+ int[] columnSizes = createNormalColumnSizes(numberOfPrimaryKeys, numberOfColumns);
+ ColumnSweepPlanner planner = new ColumnSweepPlanner(numberOfPrimaryKeys, clock);
+ IntList projectedColumns = new IntArrayList();
+ DummyColumnProjectionInfo info = new DummyColumnProjectionInfo(numberOfPrimaryKeys, QUERY, projectedColumns);
+
+ // Adjust sizes
+ planner.adjustColumnSizes(columnSizes, numberOfColumns);
+
+ // Project 3 columns
+ projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, projectedColumns);
+ // access the projected columns (max 10 times)
+ access(planner, info, true, 10);
+
+ // Advance clock
+ clock.advance(10);
+
+ // Plan for eviction
+ BitSet keptColumns = new BitSet();
+ planner.plan();
+ computeKeptColumns(planner.getPlanCopy(), keptColumns, numberOfColumns);
+
+ // Project another 3 columns
+ projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, projectedColumns);
+ // access the projected columns
+ access(planner, info, true, 100);
+
+ // At this point, the plan should change
+ BitSet newKeptColumns = new BitSet();
+ computeKeptColumns(planner.getPlanCopy(), newKeptColumns, numberOfColumns);
+
+ Assert.assertNotEquals(keptColumns, newKeptColumns);
+ }
+
+ private void computeKeptColumns(BitSet plan, BitSet keptColumns, int numberOfColumns) {
+ keptColumns.clear();
+ for (int i = 0; i < numberOfColumns; i++) {
+ if (!plan.get(i)) {
+ keptColumns.set(i);
+ }
+ }
+
+ System.out.println("Kept columns: " + keptColumns);
+ }
+
+ private void access(ColumnSweepPlanner planner, DummyColumnProjectionInfo info, boolean hasSpace, int bound) {
+ int numberOfAccesses = RANDOM.nextInt(1, bound);
+ for (int i = 0; i < numberOfAccesses; i++) {
+ planner.access(info, hasSpace);
+ clock.advance(1);
+ }
+
+ System.out.println("Accessed: " + info + " " + numberOfAccesses + " time");
+ }
+
+ private void projectedColumns(int numberPrimaryKeys, int numberOfColumns, int numberOfProjectedColumns,
+ IntList projectedColumns) {
+ IntSet alreadyProjectedColumns = new IntOpenHashSet();
+ projectedColumns.clear();
+ for (int i = 0; i < numberOfProjectedColumns; i++) {
+ int columnIndex = RANDOM.nextInt(numberPrimaryKeys, numberOfColumns);
+ while (alreadyProjectedColumns.contains(columnIndex)) {
+ columnIndex = RANDOM.nextInt(numberPrimaryKeys, numberOfColumns);
+ }
+ projectedColumns.add(columnIndex);
+ alreadyProjectedColumns.add(columnIndex);
+ }
+ }
+
+ private int[] createNormalColumnSizes(int numberOfPrimaryKeys, int numberOfColumns) {
+ int[] columnSizes = new int[numberOfColumns];
+ double[] normalDistribution = new double[numberOfColumns];
+ double sum = 0.0d;
+ for (int i = 0; i < numberOfColumns; i++) {
+ double value = Math.abs(RANDOM.nextGaussian());
+ normalDistribution[i] = value;
+ sum += value;
+ }
+
+ for (int i = numberOfPrimaryKeys; i < numberOfColumns; i++) {
+ int size = (int) Math.round((normalDistribution[i] / sum) * MAX_MEGA_LEAF_NODE_SIZE);
+ columnSizes[i] = size;
+ }
+
+ System.out.println("Column sizes:");
+ for (int i = 0; i < numberOfColumns; i++) {
+ System.out.println(i + ": " + StorageUtil.toHumanReadableSize(columnSizes[i]));
+ }
+ System.out.println("TotalSize:" + StorageUtil.toHumanReadableSize(Arrays.stream(columnSizes).sum()));
+ return columnSizes;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
new file mode 100644
index 0000000..3b9d6ba
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dummy;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+import it.unimi.dsi.fastutil.ints.IntList;
+
+public class DummyColumnProjectionInfo implements IColumnProjectionInfo {
+ private final int numberOfPrimaryKeys;
+ private final ColumnProjectorType projectorType;
+ private final IntList projectedColumns;
+
+ public DummyColumnProjectionInfo(int numberOfPrimaryKeys, ColumnProjectorType projectorType,
+ IntList projectedColumns) {
+ this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+ this.projectorType = projectorType;
+ this.projectedColumns = projectedColumns;
+ }
+
+ @Override
+ public int getColumnIndex(int ordinal) {
+ return projectedColumns.getInt(ordinal);
+ }
+
+ @Override
+ public int getNumberOfProjectedColumns() {
+ return projectedColumns.size();
+ }
+
+ @Override
+ public int getNumberOfPrimaryKeys() {
+ return numberOfPrimaryKeys;
+ }
+
+ @Override
+ public int getFilteredColumnIndex(int ordinal) {
+ return -1;
+ }
+
+ @Override
+ public int getNumberOfFilteredColumns() {
+ return 0;
+ }
+
+ @Override
+ public ColumnProjectorType getProjectorType() {
+ return projectorType;
+ }
+
+ @Override
+ public String toString() {
+ return projectedColumns.toString();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
new file mode 100644
index 0000000..6f209ec
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dummy;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ISweepClock;
+
+public class DummySweepClock implements ISweepClock {
+ long timestamp;
+
+ @Override
+ public long getCurrentTime() {
+ return timestamp;
+ }
+
+ public void advance(long delta) {
+ timestamp += delta;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
index 1779527..98d2d73 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
@@ -55,15 +55,17 @@
}
public static void persist(ILSMComponentId id, IComponentMetadata metadata) throws HyracksDataException {
- LSMComponentId componentId = (LSMComponentId) id;
- metadata.put(COMPONENT_ID_MIN_KEY, LongPointable.FACTORY.createPointable(componentId.getMinId()));
- metadata.put(COMPONENT_ID_MAX_KEY, LongPointable.FACTORY.createPointable(componentId.getMaxId()));
+ metadata.put(COMPONENT_ID_MIN_KEY, LongPointable.FACTORY.createPointable(id.getMinId()));
+ metadata.put(COMPONENT_ID_MAX_KEY, LongPointable.FACTORY.createPointable(id.getMaxId()));
}
public static ILSMComponentId union(ILSMComponentId id1, ILSMComponentId id2) {
- long minId = Long.min(((LSMComponentId) id1).getMinId(), ((LSMComponentId) id2).getMinId());
- long maxId = Long.max(((LSMComponentId) id1).getMaxId(), ((LSMComponentId) id2).getMaxId());
+ long minId = Long.min(id1.getMinId(), id2.getMinId());
+ long maxId = Long.max(id1.getMaxId(), id2.getMaxId());
return new LSMComponentId(minId, maxId);
}
+ public static boolean isMergedComponent(ILSMComponentId id) {
+ return id.getMinId() < id.getMaxId();
+ }
}