[ASTERIXDB-3389][STO] Prep. columnar datasets for disk caching
- user model changes: no
- storage format changes: yes
- interface changes: yes
Details:
- Several modifications in columnar APIs to accommodate
disk caching
- Pass read/write buffer cache context to LSMColumnBTree
bulkloader and cursors
- Some refactoring in hyracks-cloud API (ASTERIXDB-3375)
Storage changes:
- Store the length (in bytes) of the mega leaf node in
Page0 instead of the of the number of pages
Change-Id: Iababcc5fc1d4e5e2de36f9b26c3f86ffabfb4e54
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18255
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
index c1402c8..6db8a78 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java
@@ -58,6 +58,11 @@
}
@Override
+ public int getNumberOfPrimaryKeys() {
+ return primaryKeys.size();
+ }
+
+ @Override
public IColumnMetadata activate() throws HyracksDataException {
Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new MutableObject<>();
IColumnValuesWriterFactory factory = new ColumnValuesWriterFactory(multiPageOpRef);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java
index 4e19cbc..2e95384 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java
@@ -61,5 +61,6 @@
return recordFieldIndex + 1;
}
+ @Override
public abstract int getNumberOfColumns();
}
\ No newline at end of file
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
index ae3559d..f597e4f 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
public class FlushColumnTupleReaderWriterFactory extends AbstractColumnTupleReaderWriterFactory {
private static final long serialVersionUID = -9197679192729634493L;
@@ -34,7 +35,8 @@
}
@Override
- public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) {
+ public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata,
+ IColumnWriteContext writeContext) {
FlushColumnMetadata flushColumnMetadata = (FlushColumnMetadata) columnMetadata;
if (flushColumnMetadata.getMetaType() == null) {
//no meta
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
index 7fc6fbd..85569f9 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
import org.apache.hyracks.storage.common.MultiComparator;
public class LoadColumnTupleReaderWriterFactory extends FlushColumnTupleReaderWriterFactory {
@@ -36,7 +37,8 @@
}
@Override
- public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) {
+ public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata,
+ IColumnWriteContext writeContext) {
return new LoadColumnTupleWriter((FlushColumnMetadata) columnMetadata, pageSize, maxNumberOfTuples, tolerance,
maxLeafNodeSize, MultiComparator.create(cmpFactories));
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
index ae1c8d2..d792855 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
public class MergeColumnTupleReaderWriterFactory extends AbstractColumnTupleReaderWriterFactory {
private static final long serialVersionUID = -2131401304338796428L;
@@ -34,7 +35,8 @@
}
@Override
- public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) {
+ public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata,
+ IColumnWriteContext writeContext) {
MergeColumnWriteMetadata mergeWriteMetadata = (MergeColumnWriteMetadata) columnMetadata;
return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize);
}
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 6b52eb7..c4f8727 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -19,8 +19,8 @@
package org.apache.asterix.column.test.bytes;
import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
-import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMN_PAGES;
import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
import java.io.File;
@@ -170,7 +170,7 @@
for (int i = 0; i < numberOfTuplesToWrite; i++) {
tuple.set(records.get(i % records.size()));
if (isFull(writer, tupleCount, tuple)) {
- writeFullPage(pageZero, writer, tupleCount, multiPageOp);
+ writeFullPage(pageZero, writer, tupleCount);
pageZero = allocate(pageZeroList, fileId);
tupleCount = 0;
}
@@ -180,23 +180,22 @@
//Flush remaining tuples
if (tupleCount > 0) {
- writeFullPage(pageZero, writer, tupleCount, multiPageOp);
+ writeFullPage(pageZero, writer, tupleCount);
}
return pageZeroList;
}
- protected void writeFullPage(ByteBuffer pageZero, AbstractColumnTupleWriter writer, int tupleCount,
- IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException {
+ protected void writeFullPage(ByteBuffer pageZero, AbstractColumnTupleWriter writer, int tupleCount)
+ throws HyracksDataException {
pageZero.clear();
//Reserve the header space
pageZero.position(HEADER_SIZE);
- writer.flush(pageZero);
+ pageZero.putInt(MEGA_LEAF_NODE_LENGTH, writer.flush(pageZero));
//Write page header
int numberOfColumn = writer.getNumberOfColumns();
- int numberOfColumnsPages = multiPageOp.getNumberOfPersistentBuffers() - 1;
pageZero.putInt(TUPLE_COUNT_OFFSET, tupleCount);
pageZero.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumn);
- pageZero.putInt(NUMBER_OF_COLUMN_PAGES, numberOfColumnsPages);
+
}
protected boolean isFull(AbstractColumnTupleWriter columnWriter, int tupleCount, ITupleReference tuple) {
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
index 004c5f5..55bab43 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
@@ -51,6 +51,8 @@
DatasetUnit datasetUnit = datasets.get(datasetId);
if (datasetUnit != null && datasetUnit.dropIndex(resourceId)) {
datasets.remove(datasetId);
+
+ // TODO invalidate eviction plans if the disk is not pressured
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/FileSystemOperationDispatcherUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/FileSystemOperationDispatcherUtil.java
index 6708644..cccfecb 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/FileSystemOperationDispatcherUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/FileSystemOperationDispatcherUtil.java
@@ -52,6 +52,6 @@
public static boolean isLinux() {
String os = getOSName();
- return os.contains("linux");
+ return os.toLowerCase().contains("linux");
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/LinuxFileSystemOperator.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/LinuxFileSystemOperator.java
index c8e7ee9..cb12de2 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/LinuxFileSystemOperator.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/LinuxFileSystemOperator.java
@@ -22,6 +22,7 @@
import java.lang.reflect.Field;
import java.nio.channels.FileChannel;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import jnr.ffi.LibraryLoader;
@@ -52,7 +53,7 @@
@Override
public int getFileDescriptor(FileChannel fileChannel) throws HyracksDataException {
FileDescriptor fd = getField(fileChannel, "fd", FileDescriptor.class);
- return getField(fd, "fd", int.class);
+ return getField(fd, "fd", Integer.class);
}
@Override
@@ -74,40 +75,25 @@
assert offset >= 0;
assert length > 0;
- if (length < blockSize) {
- return 0;
+ /*
+ * Punching a hole for anything less than a blockSize (usually 4KB) will not free any space. However,
+ * we have to punch a hole regardless, as readers expect certain range in the file to be either 'empty' or
+ * 'filled' and cannot be half-empty.
+ */
+ int res = libc.fallocate(fileDescriptor, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, length);
+ if (res != 0) {
+ throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE,
+ "Failed to punch a hole: FALLOCATE(" + res + ")");
}
- long off = offset;
- long len = length;
- // TODO maybe optimize for power of 2
- if (off % blockSize != 0) {
- long end = off + len;
- off = (off / blockSize + 1) * blockSize;
- len = end - off;
-
- if (len <= 0) {
- return 0;
- }
- }
-
- len = len / blockSize * blockSize;
-
- if (len > 0) {
- int res = libc.fallocate(fileDescriptor, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, off, len);
- if (res != 0) {
- throw new HyracksDataException("error");
- }
- }
-
- return len;
+ return length;
}
private <T> T getField(Object object, String name, Class<T> clazz) throws HyracksDataException {
try {
Field field = object.getClass().getDeclaredField(name);
field.setAccessible(true);
- return clazz.cast(field.get(field));
+ return clazz.cast(field.get(object));
} catch (NoSuchFieldException | IllegalAccessException e) {
throw HyracksDataException.create(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
index 72996b6..6a2d9c8 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
@@ -19,12 +19,16 @@
package org.apache.hyracks.cloud.filesystem;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
import org.apache.hyracks.util.StorageUtil;
@@ -35,15 +39,16 @@
@ThreadSafe
public final class PhysicalDrive implements IPhysicalDrive {
private static final Logger LOGGER = LogManager.getLogger();
- private final List<File> drivePaths;
+ private final List<FileStore> drivePaths;
private final long pressureSize;
private final AtomicBoolean pressured;
public PhysicalDrive(List<IODeviceHandle> deviceHandles, double pressureThreshold, double storagePercentage,
- long pressureDebugSize) {
+ long pressureDebugSize) throws HyracksDataException {
drivePaths = getDrivePaths(deviceHandles);
pressureSize = getPressureSize(drivePaths, pressureThreshold, storagePercentage, pressureDebugSize);
- pressured = new AtomicBoolean(getUsedSpace() <= pressureSize);
+ pressured = new AtomicBoolean();
+ computeAndCheckIsPressured();
}
@Override
@@ -64,20 +69,25 @@
private long getUsedSpace() {
long totalUsedSpace = 0;
for (int i = 0; i < drivePaths.size(); i++) {
- File device = drivePaths.get(i);
- totalUsedSpace += device.getTotalSpace() - device.getFreeSpace();
+ FileStore device = drivePaths.get(i);
+ try {
+ totalUsedSpace += getTotalSpace(device) - getUsableSpace(device);
+ } catch (HyracksDataException e) {
+ LOGGER.warn("Cannot get used space", e);
+ }
}
return totalUsedSpace;
}
- private static long getPressureSize(List<File> drivePaths, double pressureThreshold, double storagePercentage,
- long pressureDebugSize) {
+ private static long getPressureSize(List<FileStore> drivePaths, double pressureThreshold, double storagePercentage,
+ long pressureDebugSize) throws HyracksDataException {
long totalCapacity = 0;
long totalUsedSpace = 0;
- for (File drive : drivePaths) {
- totalCapacity += drive.getTotalSpace();
- totalUsedSpace += drive.getTotalSpace() - drive.getFreeSpace();
+ for (FileStore drive : drivePaths) {
+ long totalSpace = getTotalSpace(drive);
+ totalCapacity += totalSpace;
+ totalUsedSpace += totalSpace - getUsableSpace(drive);
}
long allocatedCapacity = (long) (totalCapacity * storagePercentage);
@@ -92,19 +102,41 @@
return pressureCapacity;
}
- private static List<File> getDrivePaths(List<IODeviceHandle> deviceHandles) {
- File[] roots = File.listRoots();
- Set<File> distinctUsedRoots = new HashSet<>();
+ private static List<FileStore> getDrivePaths(List<IODeviceHandle> deviceHandles) throws HyracksDataException {
+ Set<String> distinctDrives = new HashSet<>();
+ List<FileStore> fileStores = new ArrayList<>();
for (IODeviceHandle handle : deviceHandles) {
- File handlePath = handle.getMount();
- for (File root : roots) {
- if (handlePath.getAbsolutePath().startsWith(root.getAbsolutePath())
- && !distinctUsedRoots.contains(root)) {
- distinctUsedRoots.add(root);
- break;
- }
+ FileStore fileStore = createFileStore(handle.getMount());
+ String driveName = fileStore.name();
+ if (!distinctDrives.contains(driveName)) {
+ fileStores.add(fileStore);
+ distinctDrives.add(driveName);
}
}
- return new ArrayList<>(distinctUsedRoots);
+ return fileStores;
+ }
+
+ private static FileStore createFileStore(File root) throws HyracksDataException {
+ try {
+ return Files.getFileStore(root.toPath());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private static long getTotalSpace(FileStore fileStore) throws HyracksDataException {
+ try {
+ return fileStore.getTotalSpace();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private static long getUsableSpace(FileStore fileStore) throws HyracksDataException {
+ try {
+ return fileStore.getUsableSpace();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
index 36c4a12..e5d368c 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -160,13 +160,10 @@
return;
}
- if (diskCacheManager.isSweepRequirePlanning()) {
- // Manager require planning
- diskCacheManager.prepareSweepPlan();
+ if (diskCacheManager.prepareSweepPlan()) {
+ // The Index sweep planner determined that a sweep can be performed. Sweep.
+ diskCacheManager.sweep(context);
}
- // Currently, we always sweep.
- // But we in the future we can only do a planning and sweep with another request
- diskCacheManager.sweep(context);
} finally {
indexUnit.finishedSweeping();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeNSMBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeNSMBulkLoader.java
index 04c84e1..f9186fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeNSMBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeNSMBulkLoader.java
@@ -33,6 +33,8 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
+import org.apache.hyracks.storage.common.buffercache.context.write.DefaultBufferCacheWriteContext;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.util.JSONUtil;
import org.apache.logging.log4j.LogManager;
@@ -48,12 +50,13 @@
public BTreeNSMBulkLoader(float fillFactor, boolean verifyInput, IPageWriteCallback callback, ITreeIndex index)
throws HyracksDataException {
- this(fillFactor, verifyInput, callback, index, index.getLeafFrameFactory().createFrame());
+ this(fillFactor, verifyInput, callback, index, index.getLeafFrameFactory().createFrame(),
+ DefaultBufferCacheWriteContext.INSTANCE);
}
protected BTreeNSMBulkLoader(float fillFactor, boolean verifyInput, IPageWriteCallback callback, ITreeIndex index,
- ITreeIndexFrame leafFrame) throws HyracksDataException {
- super(fillFactor, callback, index, leafFrame);
+ ITreeIndexFrame leafFrame, IBufferCacheWriteContext writeContext) throws HyracksDataException {
+ super(fillFactor, callback, index, leafFrame, writeContext);
this.verifyInput = verifyInput;
splitKey = new BTreeSplitKey(tupleWriter.createTupleReference());
splitKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
@@ -276,4 +279,4 @@
e.addSuppressed(t);
}
}
-}
\ No newline at end of file
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
index 9bbd056..df4ed47 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
@@ -40,6 +40,8 @@
import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public class DiskBTree extends BTree {
@@ -72,8 +74,8 @@
}
}
- private void search(ITreeIndexCursor cursor, ISearchPredicate searchPred, BTreeOpContext ctx)
- throws HyracksDataException {
+ private void search(ITreeIndexCursor cursor, ISearchPredicate searchPred, BTreeOpContext ctx,
+ IBufferCacheReadContext bcOpCtx) throws HyracksDataException {
ctx.reset();
RangePredicate rangePredicate = (RangePredicate) searchPred;
ctx.setPred(rangePredicate);
@@ -101,8 +103,8 @@
}
}
}
- ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), rootPage));
- searchDown(rootNode, rootPage, ctx, cursor);
+ ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), rootPage), bcOpCtx);
+ searchDown(rootNode, rootPage, ctx, cursor, bcOpCtx);
}
private boolean fitInPage(ITupleReference key, MultiComparator comparator, ITreeIndexFrame frame)
@@ -113,8 +115,8 @@
return cmp <= 0;
}
- private void searchDown(ICachedPage page, int pageId, BTreeOpContext ctx, ITreeIndexCursor cursor)
- throws HyracksDataException {
+ private void searchDown(ICachedPage page, int pageId, BTreeOpContext ctx, ITreeIndexCursor cursor,
+ IBufferCacheReadContext bcOpCtx) throws HyracksDataException {
ICachedPage currentPage = page;
ctx.getInteriorFrame().setPage(currentPage);
try {
@@ -122,10 +124,9 @@
while (!ctx.getInteriorFrame().isLeaf()) {
// walk down the tree until we find the leaf
childPageId = ctx.getInteriorFrame().getChildPageId(ctx.getPred());
- bufferCache.unpin(currentPage);
- pageId = childPageId;
+ bufferCache.unpin(currentPage, bcOpCtx);
- currentPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), childPageId));
+ currentPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), childPageId), bcOpCtx);
ctx.getInteriorFrame().setPage(currentPage);
}
@@ -137,7 +138,7 @@
cursor.open(ctx.getCursorInitialState(), ctx.getPred());
} catch (Exception e) {
if (!ctx.isExceptionHandled() && currentPage != null) {
- bufferCache.unpin(currentPage);
+ bufferCache.unpin(currentPage, bcOpCtx);
}
ctx.setExceptionHandled(true);
throw HyracksDataException.create(e);
@@ -191,7 +192,7 @@
@Override
public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException {
ctx.setOperation(IndexOperation.SEARCH);
- ((DiskBTree) btree).search((ITreeIndexCursor) cursor, searchPred, ctx);
+ ((DiskBTree) btree).search((ITreeIndexCursor) cursor, searchPred, ctx, getBufferCacheOperationContext());
}
@Override
@@ -205,6 +206,10 @@
ctx.setOperation(IndexOperation.DISKORDERSCAN);
((DiskBTree) btree).diskOrderScan(cursor, ctx);
}
+
+ protected IBufferCacheReadContext getBufferCacheOperationContext() {
+ return DefaultBufferCacheReadContextProvider.DEFAULT;
+ }
}
private class DiskBTreeDiskScanCursor extends TreeIndexDiskOrderScanCursor {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index ff1011a..9f6ce78 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -226,6 +226,10 @@
return cmpFactories;
}
+ public int getBulkloadLeafStart() {
+ return bulkloadLeafStart;
+ }
+
@Override
public String toString() {
return "{\"class\":\"" + getClass().getSimpleName() + "\",\"file\":\"" + file.getRelativePath() + "\"}";
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
index 6a2f43f..d5f836c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.storage.common.buffercache.IFIFOPageWriter;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
import org.apache.hyracks.storage.common.buffercache.context.write.DefaultBufferCacheWriteContext;
import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -64,11 +65,12 @@
protected AbstractTreeIndexBulkLoader(float fillFactor, IPageWriteCallback callback, ITreeIndex index)
throws HyracksDataException {
- this(fillFactor, callback, index, index.getLeafFrameFactory().createFrame());
+ this(fillFactor, callback, index, index.getLeafFrameFactory().createFrame(),
+ DefaultBufferCacheWriteContext.INSTANCE);
}
protected AbstractTreeIndexBulkLoader(float fillFactor, IPageWriteCallback callback, ITreeIndex index,
- ITreeIndexFrame leafFrame) throws HyracksDataException {
+ ITreeIndexFrame leafFrame, IBufferCacheWriteContext writeContext) throws HyracksDataException {
this.bufferCache = index.getBufferCache();
this.freePageManager = index.getPageManager();
this.fileId = index.getFileId();
@@ -77,7 +79,7 @@
interiorFrame = treeIndex.getInteriorFrameFactory().createFrame();
metaFrame = freePageManager.createMetadataFrame();
- pageWriter = bufferCache.createFIFOWriter(callback, this, DefaultBufferCacheWriteContext.INSTANCE);
+ pageWriter = bufferCache.createFIFOWriter(callback, this, writeContext);
if (!treeIndex.isEmptyTree(leafFrame)) {
throw HyracksDataException.create(ErrorCode.CANNOT_BULK_LOAD_NON_EMPTY_TREE);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index fcf71de..43aa873 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -88,6 +88,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReaderWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReaderWriterFactory.java
index 6ba04ba..ff261d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReaderWriterFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleReaderWriterFactory.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriterFactory;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
/**
@@ -53,8 +54,10 @@
* Create columnar tuple writer
*
* @param columnMetadata writer column metadata
+ * @param writeContext write context
*/
- public abstract AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata);
+ public abstract AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata,
+ IColumnWriteContext writeContext);
/**
* Create columnar tuple reader
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index 0c19ce7..6d26a45 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -84,7 +84,7 @@
/**
* Flush all columns from the internal buffers to the page buffer
*
- * @return the allocated space used to write tuples
+ * @return total flushed length (including page zero)
*/
public abstract int flush(ByteBuffer pageZero) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManager.java
index 278ea03..2d78821 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManager.java
@@ -25,6 +25,12 @@
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
public interface IColumnManager {
+
+ /**
+ * @return number of primary keys
+ */
+ int getNumberOfPrimaryKeys();
+
/**
* Activate the columnar manager for an empty dataset
*
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnMetadata.java
index 4c23b97..f11ef9a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnMetadata.java
@@ -34,6 +34,11 @@
IValueReference serializeColumnsMetadata() throws HyracksDataException;
/**
+ * @return number of columns
+ */
+ int getNumberOfColumns();
+
+ /**
* abort in case of an error. This should clean up any artifact
*/
void abort() throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/IColumnIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/IColumnIndexDiskCacheManager.java
new file mode 100644
index 0000000..6384555
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/IColumnIndexDiskCacheManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeBulkloader;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeRangeSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
+
+/**
+ * Extends {@link IIndexDiskCacheManager} to provide columnar-specific local disk caching operations
+ */
+public interface IColumnIndexDiskCacheManager extends IIndexDiskCacheManager {
+ /**
+ * By activating, an initial statistics about the stored columns will be gathered
+ *
+ * @param numberOfColumns number of columns
+ * @param diskComponents current disk components
+ * @param bufferCache buffer cache
+ */
+ void activate(int numberOfColumns, List<ILSMDiskComponent> diskComponents, IBufferCache bufferCache)
+ throws HyracksDataException;
+
+ /**
+ * Create {@link IBufferCacheWriteContext} context for {@link ColumnBTreeBulkloader}
+ *
+ * @param numberOfColumns a hint of the known current number of columns
+ * @param operationType operation type
+ * @return writer context
+ */
+ IColumnWriteContext createWriteContext(int numberOfColumns, LSMIOOperationType operationType);
+
+ /**
+ * Create {@link IBufferCacheReadContext} for {@link ColumnBTreeRangeSearchCursor}
+ *
+ * @param projectionInfo projected columns information
+ * @return reader context
+ */
+ IColumnReadContext createReadContext(IColumnProjectionInfo projectionInfo);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/NoOpColumnIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/NoOpColumnIndexDiskCacheManager.java
new file mode 100644
index 0000000..e6fb1b2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/NoOpColumnIndexDiskCacheManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.DefaultColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write.DefaultColumnWriteContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+
+public final class NoOpColumnIndexDiskCacheManager implements IColumnIndexDiskCacheManager {
+
+ public static final IColumnIndexDiskCacheManager INSTANCE = new NoOpColumnIndexDiskCacheManager();
+
+ private NoOpColumnIndexDiskCacheManager() {
+ }
+
+ @Override
+ public void activate(int numberOfColumns, List<ILSMDiskComponent> diskComponents, IBufferCache bufferCache)
+ throws HyracksDataException {
+ // NoOp
+ }
+
+ @Override
+ public IColumnWriteContext createWriteContext(int numberOfColumns, LSMIOOperationType operationType) {
+ return DefaultColumnWriteContext.INSTANCE;
+ }
+
+ @Override
+ public IColumnReadContext createReadContext(IColumnProjectionInfo projectionInfo) {
+ return DefaultColumnReadContext.INSTANCE;
+ }
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ @Override
+ public boolean isSweepable() {
+ return false;
+ }
+
+ @Override
+ public boolean prepareSweepPlan() {
+ return false;
+ }
+
+ @Override
+ public long sweep(ISweepContext context) {
+ return 0;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnReadContext.java
new file mode 100644
index 0000000..86a650b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnReadContext.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+
+/**
+ * Extends {@link IBufferCacheReadContext} to provide columnar-specific {@link IBufferCache} operations
+ */
+public interface IColumnReadContext extends IBufferCacheReadContext {
+ /**
+ * Pin the next Mega-leaf node
+ * Notes:
+ * - This method is responsible for unpinning the previous pageZero of the leafFrame as well as any other pages
+ * - This method may prefetch the next mega-leaf node of the newly pinned mega-leaf node
+ *
+ * @param leafFrame leaf frame used
+ * @param bufferCache buffer cache
+ * @param fileId file ID
+ * @return the pageZero of the next mega-leaf nodes
+ */
+ ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+ throws HyracksDataException;
+
+ /**
+ * Prepare the columns' pages
+ * Notes:
+ * - Calling this method does not guarantee the columns' pages will be pinned. Thus, it is the
+ * {@link IColumnTupleIterator} responsibility to pin the required pages of the requested columns
+ * - Calling this method may result in reading pages from the cloud and also persisting them in
+ * the local drive (only in the cloud deployment)
+ *
+ * @param leafFrame leaf frame used
+ * @param bufferCache buffer cache
+ * @param fileId file ID
+ */
+ void prepareColumns(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+ throws HyracksDataException;
+
+ /**
+ * Release all pinned pages
+ *
+ * @param bufferCache buffer cache
+ */
+ void release(IBufferCache bufferCache) throws HyracksDataException;
+
+ /**
+ * Closing this context will unpin all pinned (and prefetched pages if any)
+ *
+ * @param bufferCache buffer cache
+ */
+ void close(IBufferCache bufferCache) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnWriteContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnWriteContext.java
new file mode 100644
index 0000000..88a7cad
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnWriteContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache;
+
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
+
+/**
+ * Extends {@link IBufferCacheWriteContext} to provide columnar-specific {@link IBufferCache} operations
+ */
+public interface IColumnWriteContext extends IBufferCacheWriteContext {
+ /**
+ * Signal a column will be written
+ *
+ * @param columnIndex column index that will be written
+ * @param overlapping whether the first page is shared (overlapped) with the previous column
+ */
+ void startWritingColumn(int columnIndex, boolean overlapping);
+
+ /**
+ * Report the end of the writing operation of a column
+ *
+ * @param columnIndex of the column was written
+ * @param size the actual size of the column
+ */
+ void endWritingColumn(int columnIndex, int size);
+
+ /**
+ * Indicates that all columns were persisted
+ */
+ void columnsPersisted();
+
+ /**
+ * Closing the context and report any required statistics
+ */
+ void close();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java
new file mode 100644
index 0000000..591df4a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import static org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public final class DefaultColumnReadContext implements IColumnReadContext {
+ public static final IColumnReadContext INSTANCE = new DefaultColumnReadContext();
+
+ private DefaultColumnReadContext() {
+ }
+
+ @Override
+ public void onPin(ICachedPage page) {
+ // NoOp
+ }
+
+ @Override
+ public void onUnpin(ICachedPage page) {
+ // NoOp
+ }
+
+ @Override
+ public boolean isNewPage() {
+ return false;
+ }
+
+ @Override
+ public boolean incrementStats() {
+ return true;
+ }
+
+ @Override
+ public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+ CachedPage cPage) throws HyracksDataException {
+ return DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
+ }
+
+ @Override
+ public ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+ throws HyracksDataException {
+ ICachedPage nextPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrame.getNextLeaf()));
+ bufferCache.unpin(leafFrame.getPage());
+ leafFrame.setPage(nextPage);
+ return nextPage;
+ }
+
+ @Override
+ public void prepareColumns(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId) {
+ // NoOp
+ }
+
+ @Override
+ public void release(IBufferCache bufferCache) throws HyracksDataException {
+ // NoOp
+ }
+
+ @Override
+ public void close(IBufferCache bufferCache) throws HyracksDataException {
+ // NoOp
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/DefaultColumnWriteContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/DefaultColumnWriteContext.java
new file mode 100644
index 0000000..d326bee
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/DefaultColumnWriteContext.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import org.apache.hyracks.storage.common.buffercache.context.write.DefaultBufferCacheWriteContext;
+
+public final class DefaultColumnWriteContext implements IColumnWriteContext {
+ public static final IColumnWriteContext INSTANCE = new DefaultColumnWriteContext();
+
+ private DefaultColumnWriteContext() {
+ }
+
+ @Override
+ public void startWritingColumn(int columnIndex, boolean overlapping) {
+ // NoOp
+ }
+
+ @Override
+ public void endWritingColumn(int columnIndex, int size) {
+ // NoOp
+ }
+
+ @Override
+ public void columnsPersisted() {
+ // NoOp
+ }
+
+ @Override
+ public void close() {
+ // NoOp
+ }
+
+ @Override
+ public int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data)
+ throws HyracksDataException {
+ return DefaultBufferCacheWriteContext.INSTANCE.write(ioManager, handle, offset, data);
+ }
+
+ @Override
+ public long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data)
+ throws HyracksDataException {
+ return DefaultBufferCacheWriteContext.INSTANCE.write(ioManager, handle, offset, data);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
index 6ecd59e..a069a1c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
@@ -59,10 +59,10 @@
* @see AbstractColumnTupleWriter#getColumnOffsetsSize()
*/
public static final int SIZE_OF_COLUMNS_OFFSETS_OFFSET = RIGHT_MOST_KEY_OFFSET + 4;
- //Total number of columns pages
- public static final int NUMBER_OF_COLUMN_PAGES = SIZE_OF_COLUMNS_OFFSETS_OFFSET + 4;
- //A flag (used in NSM to indicate small and large pages). We can reuse it as explained above
- public static final int FLAG_OFFSET = NUMBER_OF_COLUMN_PAGES + 4;
+ // Length of the mega leaf node in bytes (including pageZero)
+ public static final int MEGA_LEAF_NODE_LENGTH = SIZE_OF_COLUMNS_OFFSETS_OFFSET + 4;
+ // A flag (used in NSM to indicate small and large pages). We can reuse it as explained above
+ public static final int FLAG_OFFSET = MEGA_LEAF_NODE_LENGTH + 4;
public static final int NEXT_LEAF_OFFSET = FLAG_OFFSET + 1;
public static final int HEADER_SIZE = NEXT_LEAF_OFFSET + 4;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java
index fcee22c..a7a07be 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java
@@ -28,12 +28,15 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
public class ColumnBTree extends DiskBTree {
public ColumnBTree(IBufferCache bufferCache, IPageManager freePageManager,
@@ -49,46 +52,60 @@
}
public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, IPageWriteCallback callback,
- IColumnMetadata columnMetadata) throws HyracksDataException {
+ IColumnMetadata columnMetadata, IColumnWriteContext writeContext) throws HyracksDataException {
ColumnBTreeLeafFrameFactory columnLeafFrameFactory = (ColumnBTreeLeafFrameFactory) leafFrameFactory;
- ColumnBTreeWriteLeafFrame writeLeafFrame = columnLeafFrameFactory.createWriterFrame(columnMetadata);
- return new ColumnBTreeBulkloader(fillFactor, verifyInput, callback, this, writeLeafFrame);
+ ColumnBTreeWriteLeafFrame writeLeafFrame =
+ columnLeafFrameFactory.createWriterFrame(columnMetadata, writeContext);
+ return new ColumnBTreeBulkloader(fillFactor, verifyInput, callback, this, writeLeafFrame, writeContext);
}
@Override
public BTreeAccessor createAccessor(IIndexAccessParameters iap) {
- throw new IllegalArgumentException("Use createAccessor(IIndexAccessParameters, int, IColumnTupleProjector)");
+ throw new IllegalArgumentException(
+ "Use createAccessor(IIndexAccessParameters, int, IColumnProjectionInfo, IColumnReadContext)");
}
- public BTreeAccessor createAccessor(IIndexAccessParameters iap, int index, IColumnProjectionInfo projectionInfo) {
- return new ColumnBTreeAccessor(this, iap, index, projectionInfo);
+ public BTreeAccessor createAccessor(IIndexAccessParameters iap, int index, IColumnProjectionInfo projectionInfo,
+ IColumnReadContext context) {
+ return new ColumnBTreeAccessor(this, iap, index, projectionInfo, context);
}
public class ColumnBTreeAccessor extends DiskBTreeAccessor {
private final int index;
private final IColumnProjectionInfo projectionInfo;
+ private final IColumnReadContext context;
public ColumnBTreeAccessor(ColumnBTree btree, IIndexAccessParameters iap, int index,
- IColumnProjectionInfo projectionInfo) {
+ IColumnProjectionInfo projectionInfo, IColumnReadContext context) {
super(btree, iap);
this.index = index;
this.projectionInfo = projectionInfo;
+ this.context = context;
}
@Override
public ITreeIndexCursor createSearchCursor(boolean exclusive) {
ColumnBTreeLeafFrameFactory columnLeafFrameFactory = (ColumnBTreeLeafFrameFactory) leafFrameFactory;
ColumnBTreeReadLeafFrame readLeafFrame = columnLeafFrameFactory.createReadFrame(projectionInfo);
- return new ColumnBTreeRangeSearchCursor(readLeafFrame, (IIndexCursorStats) iap.getParameters()
- .getOrDefault(HyracksConstants.INDEX_CURSOR_STATS, NoOpIndexCursorStats.INSTANCE), index);
+ return new ColumnBTreeRangeSearchCursor(
+ readLeafFrame, (IIndexCursorStats) iap.getParameters()
+ .getOrDefault(HyracksConstants.INDEX_CURSOR_STATS, NoOpIndexCursorStats.INSTANCE),
+ index, context);
}
@Override
public ITreeIndexCursor createPointCursor(boolean exclusive, boolean stateful) {
ColumnBTreeLeafFrameFactory columnLeafFrameFactory = (ColumnBTreeLeafFrameFactory) leafFrameFactory;
ColumnBTreeReadLeafFrame readLeafFrame = columnLeafFrameFactory.createReadFrame(projectionInfo);
- return new ColumnBTreePointSearchCursor(readLeafFrame, (IIndexCursorStats) iap.getParameters()
- .getOrDefault(HyracksConstants.INDEX_CURSOR_STATS, NoOpIndexCursorStats.INSTANCE), index);
+ return new ColumnBTreePointSearchCursor(
+ readLeafFrame, (IIndexCursorStats) iap.getParameters()
+ .getOrDefault(HyracksConstants.INDEX_CURSOR_STATS, NoOpIndexCursorStats.INSTANCE),
+ index, context);
+ }
+
+ @Override
+ protected IBufferCacheReadContext getBufferCacheOperationContext() {
+ return context;
}
}
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 51e8c09..ee2077f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -33,10 +33,12 @@
import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
import org.apache.hyracks.storage.common.buffercache.CachedPage;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -48,6 +50,7 @@
private final ColumnBTreeWriteLeafFrame columnarFrame;
private final AbstractColumnTupleWriter columnWriter;
private final ISplitKey lowKey;
+ private final IColumnWriteContext columnWriteContext;
private boolean setLowKey;
private int tupleCount;
@@ -59,10 +62,11 @@
private int maxTupleCount;
public ColumnBTreeBulkloader(float fillFactor, boolean verifyInput, IPageWriteCallback callback, ITreeIndex index,
- ITreeIndexFrame leafFrame) throws HyracksDataException {
- super(fillFactor, verifyInput, callback, index, leafFrame);
+ ITreeIndexFrame leafFrame, IBufferCacheWriteContext writeContext) throws HyracksDataException {
+ super(fillFactor, verifyInput, callback, index, leafFrame, writeContext);
columnsPages = new ArrayList<>();
tempConfiscatedPages = new ArrayList<>();
+ columnWriteContext = (IColumnWriteContext) writeContext;
columnarFrame = (ColumnBTreeWriteLeafFrame) leafFrame;
columnWriter = columnarFrame.getColumnTupleWriter();
columnWriter.init(this);
@@ -128,7 +132,7 @@
public void end() throws HyracksDataException {
if (tupleCount > 0) {
splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
- columnarFrame.flush(columnWriter, tupleCount, this, lowKey.getTuple(), splitKey.getTuple());
+ columnarFrame.flush(columnWriter, tupleCount, lowKey.getTuple(), splitKey.getTuple());
}
columnWriter.close();
//We are done, return any temporary confiscated pages
@@ -152,7 +156,7 @@
splitKey.setLeftPage(leafFrontier.pageId);
if (tupleCount > 0) {
//We need to flush columns to confiscate all columns pages first before calling propagateBulk
- columnarFrame.flush(columnWriter, tupleCount, this, lowKey.getTuple(), splitKey.getTuple());
+ columnarFrame.flush(columnWriter, tupleCount, lowKey.getTuple(), splitKey.getTuple());
}
propagateBulk(1, pagesToWrite);
@@ -209,6 +213,8 @@
numberOfPagesInCurrentLeafNode += numberOfPagesInPersistedColumn;
columnsPages.clear();
+ // Indicate to the columnWriteContext that all columns were persisted
+ columnWriteContext.columnsPersisted();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeLeafFrameFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeLeafFrameFactory.java
index 31d85bd..93e68b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeLeafFrameFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeLeafFrameFactory.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
public class ColumnBTreeLeafFrameFactory implements ITreeIndexFrameFactory {
private static final long serialVersionUID = 4136035898137820322L;
@@ -51,9 +52,11 @@
return rowTupleWriterFactory;
}
- public ColumnBTreeWriteLeafFrame createWriterFrame(IColumnMetadata columnMetadata) {
+ public ColumnBTreeWriteLeafFrame createWriterFrame(IColumnMetadata columnMetadata,
+ IColumnWriteContext writeContext) {
ITreeIndexTupleWriter rowTupleWriter = rowTupleWriterFactory.createTupleWriter();
- AbstractColumnTupleWriter columnTupleWriter = columnTupleWriterFactory.createColumnWriter(columnMetadata);
+ AbstractColumnTupleWriter columnTupleWriter =
+ columnTupleWriterFactory.createColumnWriter(columnMetadata, writeContext);
return new ColumnBTreeWriteLeafFrame(rowTupleWriter, columnTupleWriter);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
index db878c4..f64a4c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
@@ -31,8 +32,9 @@
public class ColumnBTreePointSearchCursor extends ColumnBTreeRangeSearchCursor
implements IDiskBTreeStatefulPointSearchCursor {
- public ColumnBTreePointSearchCursor(ColumnBTreeReadLeafFrame frame, IIndexCursorStats stats, int index) {
- super(frame, stats, index);
+ public ColumnBTreePointSearchCursor(ColumnBTreeReadLeafFrame frame, IIndexCursorStats stats, int index,
+ IColumnReadContext context) {
+ super(frame, stats, index, context);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index abac582..2669d4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
import org.apache.hyracks.storage.common.EnforcedIndexCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.IIndexCursorStats;
@@ -41,6 +42,7 @@
implements ITreeIndexCursor, IColumnReadMultiPageOp {
protected final ColumnBTreeReadLeafFrame frame;
+ private final IColumnReadContext context;
protected final IColumnTupleIterator frameTuple;
protected IBufferCache bufferCache = null;
@@ -64,8 +66,10 @@
protected final IIndexCursorStats stats;
- public ColumnBTreeRangeSearchCursor(ColumnBTreeReadLeafFrame frame, IIndexCursorStats stats, int index) {
+ public ColumnBTreeRangeSearchCursor(ColumnBTreeReadLeafFrame frame, IIndexCursorStats stats, int index,
+ IColumnReadContext context) {
this.frame = frame;
+ this.context = context;
this.frameTuple = frame.createTupleReference(index, this);
this.reusablePredicate = new RangePredicate();
this.stats = stats;
@@ -83,14 +87,12 @@
return frameTuple;
}
- private void fetchNextLeafPage(int leafPage) throws HyracksDataException {
- int nextLeafPage = leafPage;
+ private void fetchNextLeafPage() throws HyracksDataException {
+ int nextLeafPage;
do {
- ICachedPage nextLeaf = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, nextLeafPage));
+ page0 = context.pinNext(frame, bufferCache, fileId);
stats.getPageCounter().update(1);
- bufferCache.unpin(page0);
- page0 = nextLeaf;
- frame.setPage(page0);
+ context.prepareColumns(frame, bufferCache, fileId);
frameTuple.newPage();
setCursorPosition();
nextLeafPage = frame.getNextLeaf();
@@ -104,7 +106,7 @@
frameTuple.lastTupleReached();
nextLeafPage = frame.getNextLeaf();
if (nextLeafPage >= 0) {
- fetchNextLeafPage(nextLeafPage);
+ fetchNextLeafPage();
} else {
return false;
}
@@ -129,6 +131,7 @@
frame.setPage(page0);
frame.setMultiComparator(originalKeyCmp);
if (frame.getTupleCount() > 0) {
+ context.prepareColumns(frame, bufferCache, fileId);
frameTuple.newPage();
initCursorPosition(searchPred);
} else {
@@ -181,9 +184,10 @@
protected void releasePages() throws HyracksDataException {
//Unpin all column pages first
+ context.release(bufferCache);
frameTuple.unpinColumnsPages();
if (page0 != null) {
- bufferCache.unpin(page0);
+ bufferCache.unpin(page0, context);
}
}
@@ -258,6 +262,7 @@
public void doClose() throws HyracksDataException {
releasePages();
frameTuple.close();
+ context.close(bufferCache);
page0 = null;
pred = null;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
index 8872613..d4b230a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
@@ -45,6 +45,7 @@
if (getTupleCount() == 0) {
return null;
}
+
leftMostTuple.setFieldCount(cmp.getKeyFieldCount());
leftMostTuple.resetByTupleOffset(buf.array(), buf.getInt(LEFT_MOST_KEY_OFFSET));
return leftMostTuple;
@@ -55,6 +56,7 @@
if (getTupleCount() == 0) {
return null;
}
+
rightMostTuple.setFieldCount(cmp.getKeyFieldCount());
rightMostTuple.resetByTupleOffset(buf.array(), buf.getInt(RIGHT_MOST_KEY_OFFSET));
return rightMostTuple;
@@ -84,12 +86,20 @@
return columnarTupleReader.getColumnOffset(buf, columnIndex);
}
- AbstractColumnTupleReader getColumnarTupleReader() {
- return columnarTupleReader;
+ public int getNextLeaf() {
+ return buf.getInt(NEXT_LEAF_OFFSET);
}
- int getNextLeaf() {
- return buf.getInt(NEXT_LEAF_OFFSET);
+ public long getMegaLeafNodeLengthInBytes() {
+ return buf.getInt(MEGA_LEAF_NODE_LENGTH);
+ }
+
+ public int getMegaLeafNodeNumberOfPages() {
+ return (int) Math.ceil((double) getMegaLeafNodeLengthInBytes() / buf.capacity());
+ }
+
+ public ColumnBTreeReadLeafFrame createCopy() {
+ return new ColumnBTreeReadLeafFrame(rowTupleWriter, columnarTupleReader);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
index 275fb0e..c725084 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
@@ -22,7 +22,6 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
-import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
public class ColumnBTreeWriteLeafFrame extends AbstractColumnBTreeLeafFrame {
private final AbstractColumnTupleWriter columnTupleWriter;
@@ -41,31 +40,30 @@
buf.putInt(LEFT_MOST_KEY_OFFSET, -1);
buf.putInt(RIGHT_MOST_KEY_OFFSET, -1);
buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, 0);
- buf.putInt(NUMBER_OF_COLUMN_PAGES, 0);
+ buf.putInt(MEGA_LEAF_NODE_LENGTH, 0);
buf.put(FLAG_OFFSET, (byte) 0);
buf.putInt(NEXT_LEAF_OFFSET, -1);
}
- void flush(AbstractColumnTupleWriter columnWriter, int numberOfTuples, IColumnWriteMultiPageOp multiPageOp,
- ITupleReference minKey, ITupleReference maxKey) throws HyracksDataException {
- //Prepare the space for writing the columns' information such as the primary keys
+ void flush(AbstractColumnTupleWriter columnWriter, int numberOfTuples, ITupleReference minKey,
+ ITupleReference maxKey) throws HyracksDataException {
+ // Prepare the space for writing the columns' information such as the primary keys
buf.position(HEADER_SIZE);
- //Write the columns' information including the columns' offsets and the primary keys
- columnWriter.flush(buf);
+ // Flush the columns to persistence pages and write the length of the mega leaf node in pageZero
+ buf.putInt(MEGA_LEAF_NODE_LENGTH, columnWriter.flush(buf));
- //Write min and max keys
+ // Write min and max keys
int offset = buf.position();
buf.putInt(LEFT_MOST_KEY_OFFSET, offset);
offset += rowTupleWriter.writeTuple(minKey, buf.array(), offset);
buf.putInt(RIGHT_MOST_KEY_OFFSET, offset);
rowTupleWriter.writeTuple(maxKey, buf.array(), offset);
- //Write page information
+ // Write page information
int numberOfColumns = columnWriter.getNumberOfColumns();
buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
buf.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumns);
buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, columnWriter.getColumnOffsetsSize());
- buf.putInt(NUMBER_OF_COLUMN_PAGES, multiPageOp.getNumberOfPersistentBuffers());
}
public AbstractColumnTupleWriter getColumnTupleWriter() {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
index e95b380..958bfde 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IColumnIndexDiskCacheManager;
import org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeBatchPointSearchCursor;
@@ -52,6 +53,7 @@
public class LSMColumnBTree extends LSMBTree {
private static final ICursorFactory CURSOR_FACTORY = LSMColumnBTreeSearchCursor::new;
private final IColumnManager columnManager;
+ private final IColumnIndexDiskCacheManager diskCacheManager;
private final ILSMDiskComponentFactory mergeComponentFactory;
/**
* This column metadata only used during flush and dataset bulkload operations. We cannot have more than one
@@ -70,8 +72,8 @@
double bloomFilterFalsePositiveRate, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
- int[] btreeFields, ITracer tracer, IColumnManager columnManager, boolean atomic)
- throws HyracksDataException {
+ int[] btreeFields, ITracer tracer, IColumnManager columnManager, boolean atomic,
+ IColumnIndexDiskCacheManager diskCacheManager) throws HyracksDataException {
super(ioManager, virtualBufferCaches, interiorFrameFactory, insertLeafFrameFactory, deleteLeafFrameFactory,
diskBufferCache, fileManager, componentFactory, bulkloadComponentFactory, null, null, null,
bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy, opTracker, ioScheduler,
@@ -79,6 +81,7 @@
atomic);
this.columnManager = columnManager;
this.mergeComponentFactory = mergeComponentFactory;
+ this.diskCacheManager = diskCacheManager;
}
@Override
@@ -90,6 +93,8 @@
IComponentMetadata componentMetadata = diskComponents.get(0).getMetadata();
columnMetadata = columnManager.activate(ColumnUtil.getColumnMetadataCopy(componentMetadata));
}
+
+ diskCacheManager.activate(columnMetadata.getNumberOfColumns(), diskComponents, diskBufferCache);
}
@Override
@@ -124,7 +129,7 @@
}
@Override
- protected ILSMDiskComponentFactory getMergeComponentFactory() {
+ public ILSMDiskComponentFactory getMergeComponentFactory() {
return mergeComponentFactory;
}
@@ -132,4 +137,9 @@
public ICursorFactory getCursorFactory() {
return CURSOR_FACTORY;
}
+
+ @Override
+ public IColumnIndexDiskCacheManager getDiskCacheManager() {
+ return diskCacheManager;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeOpContext.java
index 8a33de1..43deeb2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeOpContext.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.ColumnAwareMultiComparator;
import org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeOpContext;
@@ -81,4 +82,8 @@
}
return new ColumnAwareMultiComparator(comparators);
}
+
+ public IColumnReadContext createPageZeroContext(IColumnProjectionInfo projectionInfo) {
+ return ((LSMColumnBTree) index).getDiskCacheManager().createReadContext(projectionInfo);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeRangeSearchCursor.java
index be5837f..41126e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeRangeSearchCursor.java
@@ -22,13 +22,16 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
+import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeRangeSearchCursor;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.ColumnAwareDiskOnlyMultiComparator;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeRangeSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.IIndexCursor;
@@ -52,13 +55,16 @@
}
@Override
- protected BTreeAccessor createAccessor(LSMComponentType type, BTree btree, int index) throws HyracksDataException {
- if (type == LSMComponentType.MEMORY) {
- return super.createAccessor(type, btree, index);
+ protected BTreeAccessor createAccessor(ILSMComponent component, int index) throws HyracksDataException {
+ if (component.getType() == LSMComponentType.MEMORY) {
+ return super.createAccessor(component, index);
}
- ColumnBTree columnBTree = (ColumnBTree) btree;
+
+ ColumnBTree columnBTree = (ColumnBTree) component.getIndex();
LSMColumnBTreeOpContext columnOpCtx = (LSMColumnBTreeOpContext) opCtx;
- return columnBTree.createAccessor(iap, index, columnOpCtx.createProjectionInfo());
+ IColumnProjectionInfo projectionInfo = columnOpCtx.createProjectionInfo();
+ IColumnReadContext context = columnOpCtx.createPageZeroContext(projectionInfo);
+ return columnBTree.createAccessor(NoOpIndexAccessParameters.INSTANCE, index, projectionInfo, context);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeWithBloomFilterDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeWithBloomFilterDiskComponent.java
index 57e162d..8d3ae32 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeWithBloomFilterDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeWithBloomFilterDiskComponent.java
@@ -25,6 +25,8 @@
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IColumnIndexDiskCacheManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
import org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeMergeOperation;
@@ -86,7 +88,11 @@
columnMetadata = lsmColumnBTree.getColumnManager().createMergeColumnMetadata(columnMetadataValue,
cursor.getComponentTupleList());
}
- IIndexBulkLoader bulkLoader = columnBTree.createBulkLoader(fillFactor, verifyInput, callback, columnMetadata);
+ int numberOfColumns = columnMetadata.getNumberOfColumns();
+ IColumnIndexDiskCacheManager diskCacheManager = lsmColumnBTree.getDiskCacheManager();
+ IColumnWriteContext writeContext = diskCacheManager.createWriteContext(numberOfColumns, operationType);
+ IIndexBulkLoader bulkLoader =
+ columnBTree.createBulkLoader(fillFactor, verifyInput, callback, columnMetadata, writeContext);
return new LSMColumnIndexBulkloader(bulkLoader, columnMetadata, getMetadata());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBatchPointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBatchPointSearchCursor.java
index 65b292b..75f129c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBatchPointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBatchPointSearchCursor.java
@@ -22,6 +22,8 @@
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeBatchPointSearchCursor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
@@ -40,7 +42,8 @@
}
ColumnBTree columnBTree = (ColumnBTree) btree;
LSMColumnBTreeOpContext columnOpCtx = (LSMColumnBTreeOpContext) opCtx;
- return columnBTree.createAccessor(NoOpIndexAccessParameters.INSTANCE, index,
- columnOpCtx.createProjectionInfo());
+ IColumnProjectionInfo projectionInfo = columnOpCtx.createProjectionInfo();
+ IColumnReadContext context = columnOpCtx.createPageZeroContext(projectionInfo);
+ return columnBTree.createAccessor(NoOpIndexAccessParameters.INSTANCE, index, projectionInfo, context);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnPointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnPointSearchCursor.java
index e193232..b00c3a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnPointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnPointSearchCursor.java
@@ -22,6 +22,8 @@
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreePointSearchCursor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
@@ -40,7 +42,8 @@
}
ColumnBTree columnBTree = (ColumnBTree) btree;
LSMColumnBTreeOpContext columnOpCtx = (LSMColumnBTreeOpContext) opCtx;
- return columnBTree.createAccessor(NoOpIndexAccessParameters.INSTANCE, index,
- columnOpCtx.createProjectionInfo());
+ IColumnProjectionInfo projectionInfo = columnOpCtx.createProjectionInfo();
+ IColumnReadContext context = columnOpCtx.createPageZeroContext(projectionInfo);
+ return columnBTree.createAccessor(NoOpIndexAccessParameters.INSTANCE, index, projectionInfo, context);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index 490fa2e..039aaed 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -32,7 +32,10 @@
import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
import org.apache.hyracks.storage.am.common.api.INullIntrospector;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManagerFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IColumnIndexDiskCacheManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.NoOpColumnIndexDiskCacheManager;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeFactory;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeLeafFrameFactory;
@@ -65,6 +68,9 @@
ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
INullIntrospector nullIntrospector, IColumnManagerFactory columnManagerFactory, boolean atomic)
throws HyracksDataException {
+ // Initialize managers
+ IColumnManager columnManager = columnManagerFactory.createColumnManager();
+ IColumnIndexDiskCacheManager diskCacheManager = NoOpColumnIndexDiskCacheManager.INSTANCE;
//Tuple writers
LSMBTreeTupleWriterFactory insertTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
@@ -111,6 +117,6 @@
deleteLeafFrameFactory, diskBufferCache, fileNameManager, flushComponentFactory, mergeComponentFactory,
bulkLoadComponentFactory, bloomFilterFalsePositiveRate, typeTraits.length, cmpFactories, mergePolicy,
opTracker, ioScheduler, ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, tracer,
- columnManagerFactory.createColumnManager(), atomic);
+ columnManager, atomic, diskCacheManager);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 968416c..f3c3b79 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -403,7 +403,7 @@
}
btree = (BTree) component.getIndex();
if (btreeAccessors[i] == null || destroyIncompatible(component, i)) {
- btreeAccessors[i] = createAccessor(type, btree, i);
+ btreeAccessors[i] = createAccessor(component, i);
rangeCursors[i] = createCursor(type, btreeAccessors[i]);
} else {
// re-use
@@ -444,7 +444,8 @@
return resultOfSearchCallbackProceed;
}
- protected BTreeAccessor createAccessor(LSMComponentType type, BTree btree, int index) throws HyracksDataException {
+ protected BTreeAccessor createAccessor(ILSMComponent component, int index) throws HyracksDataException {
+ BTree btree = (BTree) component.getIndex();
return btree.createAccessor(iap);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/DefaultIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/DefaultIndexDiskCacheManager.java
index fd0c983..532caea 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/DefaultIndexDiskCacheManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/DefaultIndexDiskCacheManager.java
@@ -35,12 +35,7 @@
}
@Override
- public boolean isSweepRequirePlanning() {
- throw new IllegalStateException(NOT_SWEEPABLE_ERR_MSG);
- }
-
- @Override
- public void prepareSweepPlan() {
+ public boolean prepareSweepPlan() {
throw new IllegalStateException(NOT_SWEEPABLE_ERR_MSG);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/IIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/IIndexDiskCacheManager.java
index ee393af..d3923fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/IIndexDiskCacheManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/IIndexDiskCacheManager.java
@@ -39,14 +39,11 @@
boolean isSweepable();
/**
- * @return whether a sweep operation requires planning
- */
- boolean isSweepRequirePlanning();
-
- /**
* Prepare a sweep plan
+ *
+ * @return true if the plan determines a sweep can be performed
*/
- void prepareSweepPlan();
+ boolean prepareSweepPlan();
/**
* Sweep an index to make space in {@link IPhysicalDrive}