[ASTERIXDB-3601][STO] Supporting multi-page zeroes
- user model changes: no
- storage format changes: yes
- interface changes: yes
Details:
This patch introduces writers which can span upto
multiple pageZeroes, which can theoretically help
to achieve a very large number of columns.
Ext-ref: MB-66306
Change-Id: I0db8588d9a05331a6542b9bc1f3ca1c342ca70a7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19987
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ritik Raj <ritik.raj@couchbase.com>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
new file mode 100644
index 0000000..bb02166
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.in;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.ColumnMultiPageZeroBufferProvider;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
+public final class MultiPageZeroByteBuffersReader {
+ private static final ByteBuffer EMPTY;
+ private ColumnMultiPageZeroBufferProvider bufferProvider;
+ private final Int2IntMap segmentDir; // should I just create a buffer[numberOfSegments] instead?
+ private int maxBuffersSize;
+
+ static {
+ EMPTY = ByteBuffer.allocate(0);
+ EMPTY.limit(0);
+ }
+
+ private final List<ByteBuffer> buffers;
+
+ public MultiPageZeroByteBuffersReader() {
+ this.buffers = new ArrayList<>();
+ segmentDir = new Int2IntOpenHashMap();
+ segmentDir.defaultReturnValue(-1);
+ }
+
+ public void reset(IColumnBufferProvider bufferProvider) throws HyracksDataException {
+ ColumnMultiPageZeroBufferProvider pageZeroBufferProvider = (ColumnMultiPageZeroBufferProvider) bufferProvider;
+ reset();
+ this.bufferProvider = pageZeroBufferProvider;
+ maxBuffersSize = pageZeroBufferProvider.getNumberOfRemainingPages();
+ pageZeroBufferProvider.readAll(buffers, segmentDir);
+ }
+
+ public void read(int segmentIndex, IPointable pointable, int position, int length)
+ throws EOFException, HyracksDataException {
+ if (segmentIndex < 0 || segmentIndex >= maxBuffersSize) {
+ throw new IndexOutOfBoundsException("Buffer index out of bounds: " + segmentIndex);
+ }
+
+ int bufferIndex = segmentDir.get(segmentIndex);
+ if (bufferIndex == -1) {
+ //Fill up the buffer
+ // this page was not pinned, because of the DefaultReadContext, as the pages were expected to be in disk.
+ // so read the required segment, and fill the buffer.
+ ByteBuffer buffer = bufferProvider.read(segmentIndex);
+ segmentDir.put(segmentIndex, buffers.size());
+ bufferIndex = buffers.size();
+ buffers.add(buffer);
+ }
+ ByteBuffer buffer = buffers.get(bufferIndex);
+ pointable.set(buffer.array(), position, length);
+ }
+
+ public void readOffset(long[] offsetColumnIndexPairs, int maxColumnsInZerothSegment, int numberOfColumnsInAPage) {
+ int numberOfColumns = offsetColumnIndexPairs.length - 1;
+ for (Int2IntMap.Entry pair : segmentDir.int2IntEntrySet()) {
+ int segmentIndex = pair.getIntKey();
+ int bufferIndex = pair.getIntValue();
+ ByteBuffer buffer = buffers.get(bufferIndex);
+ int columnIndex = maxColumnsInZerothSegment + segmentIndex * numberOfColumnsInAPage;
+ int segmentOffset = 0;
+ for (int j = 0; j < numberOfColumnsInAPage; j++) {
+ int columnOffset = buffer.getInt(segmentOffset);
+ offsetColumnIndexPairs[columnIndex] = IntPairUtil.of(columnOffset, columnIndex);
+ segmentOffset += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ columnIndex++;
+ if (columnIndex == numberOfColumns) {
+ break; // No need to read more columns from this buffer.
+ }
+ }
+ }
+ }
+
+ public void readSparseOffset(long[] offsetColumnIndexPairs, int numberOfPageSegments, int numberOfColumnsInAPage,
+ int numberOfColumnsInLastSegment) {
+ for (Int2IntMap.Entry pair : segmentDir.int2IntEntrySet()) {
+ int segmentIndex = pair.getIntKey();
+ int bufferIndex = pair.getIntValue();
+ ByteBuffer buffer = buffers.get(bufferIndex);
+ int segmentOffset = 0;
+ int numberOfColumnsInSegment =
+ segmentIndex == numberOfPageSegments - 2 ? numberOfColumnsInLastSegment : numberOfColumnsInAPage;
+ for (int j = 0; j < numberOfColumnsInSegment; j++) {
+ int columnIndex = buffer.getInt(segmentOffset);
+ int columnOffset = buffer.getInt(segmentOffset + Integer.BYTES);
+ offsetColumnIndexPairs[columnIndex] = IntPairUtil.of(columnOffset, columnIndex);
+ segmentOffset += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ }
+ }
+ }
+
+ public void readAllColumns(BitSet presentColumns, int numberOfPageSegments, int numberOfColumnsInAPage,
+ int numberOfColumnsInLastSegment) {
+ final int stride = SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ final int lastSegmentIndex = numberOfPageSegments - 2;
+
+ for (Int2IntMap.Entry entry : segmentDir.int2IntEntrySet()) {
+ final int segmentIndex = entry.getIntKey();
+ final int bufferIndex = entry.getIntValue();
+ final ByteBuffer buffer = buffers.get(bufferIndex);
+
+ final int columnsInSegment =
+ (segmentIndex == lastSegmentIndex) ? numberOfColumnsInLastSegment : numberOfColumnsInAPage;
+
+ int offset = 0;
+ int limit = columnsInSegment * stride;
+
+ while (offset < limit) {
+ presentColumns.set(buffer.getInt(offset));
+ offset += stride;
+ }
+ }
+ }
+
+ public int findColumnIndexInSegment(int segmentIndex, int columnIndex, int numberOfColumnsInSegment)
+ throws HyracksDataException {
+ if (segmentIndex < 0 || segmentIndex >= maxBuffersSize) {
+ throw new IndexOutOfBoundsException("Buffer index out of bounds: " + segmentIndex);
+ }
+ int bufferIndex = segmentDir.get(segmentIndex);
+ if (bufferIndex == -1) {
+ //Fill up the buffer
+ // this page was not pinned, because of the DefaultReadContext, as the pages were expected to be in disk.
+ // so read the required segment, and fill the buffer.
+ ByteBuffer buffer = bufferProvider.read(segmentIndex);
+ segmentDir.put(segmentIndex, buffers.size());
+ bufferIndex = buffers.size();
+ buffers.add(buffer);
+ }
+ ByteBuffer buffer = buffers.get(bufferIndex);
+ int start = 0;
+ int end = numberOfColumnsInSegment - 1;
+ while (start <= end) {
+ int mid = start + (end - start) / 2;
+ int midColumnIndex = buffer.getInt(mid * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE);
+ if (midColumnIndex == columnIndex) {
+ return mid; // found the column index
+ } else if (midColumnIndex < columnIndex) {
+ start = mid + 1;
+ } else {
+ end = mid - 1;
+ }
+ }
+
+ return -1;
+ }
+
+ public void reset() {
+ buffers.clear();
+ maxBuffersSize = 0;
+ segmentDir.clear();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
index 4b7c835..0c311f1 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
@@ -81,6 +81,34 @@
}
}
+ public void writeInSegment(int bufferIndex, int off, int val) throws IOException {
+ if (bufferIndex >= buffers.size()) {
+ int requiredBuffers = bufferIndex - buffers.size() + 1;
+ allocateBuffers(requiredBuffers);
+ }
+ ByteBuffer buffer = buffers.get(bufferIndex);
+ buffer.putInt(off, val);
+ }
+
+ public void writeInSegment(int bufferIndex, int off, int val1, int val2) throws IOException {
+ if (bufferIndex >= buffers.size()) {
+ int requiredBuffers = bufferIndex - buffers.size() + 1;
+ allocateBuffers(requiredBuffers);
+ }
+ ByteBuffer buffer = buffers.get(bufferIndex);
+ buffer.putInt(off, val1);
+ buffer.putInt(off + Integer.BYTES, val2);
+ }
+
+ public void writeInSegment(int bufferIndex, int off, long val) throws IOException {
+ if (bufferIndex >= buffers.size()) {
+ int requiredBuffers = bufferIndex - buffers.size() + 1;
+ allocateBuffers(requiredBuffers);
+ }
+ ByteBuffer buffer = buffers.get(bufferIndex);
+ buffer.putLong(off, val);
+ }
+
@Override
public void reserveByte(IReservedPointer pointer) throws IOException {
ensureCapacity(Byte.BYTES);
@@ -161,4 +189,10 @@
allocatedBytes += size;
return size;
}
+
+ protected void allocateBuffers(int count) throws HyracksDataException {
+ for (int i = 0; i < count; i++) {
+ allocateBuffer();
+ }
+ }
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentPageZeroBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentPageZeroBufferBytesOutputStream.java
new file mode 100644
index 0000000..c156e49
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentPageZeroBufferBytesOutputStream.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+public final class MultiPersistentPageZeroBufferBytesOutputStream extends AbstractMultiBufferBytesOutputStream {
+ public MultiPersistentPageZeroBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ super(multiPageOpRef);
+ }
+
+ @Override
+ protected ByteBuffer confiscateNewBuffer() throws HyracksDataException {
+ return multiPageOpRef.getValue().confiscatePageZeroPersistent();
+ }
+
+ public void reset(int requiredPageSegments) throws HyracksDataException {
+ preReset();
+ allocateBuffers(requiredPageSegments); // these many buffers are required for page zero segments
+ }
+
+ @Override
+ protected void preReset() {
+ if (allocatedBytes > 0) {
+ //This should not be the case, with the pageZero segments.
+ //As the stream should be finished after the flush.
+ throw new IllegalStateException("PageZero segments should already be finished after flush");
+ }
+ }
+
+ @Override
+ public void writeTo(OutputStream outputStream) {
+ throw new IllegalAccessError("Persistent stream cannot be written to other stream");
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java
index e14fce3..218b2ba 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/filter/FilterAccessorProvider.java
@@ -139,16 +139,15 @@
}
public static void setFilterValues(List<IColumnRangeFilterValueAccessor> filterValueAccessors,
- ColumnBTreeReadLeafFrame frame) {
+ ColumnBTreeReadLeafFrame frame) throws HyracksDataException {
IColumnPageZeroReader pageZeroReader = frame.getColumnPageZeroReader();
for (int i = 0; i < filterValueAccessors.size(); i++) {
ColumnRangeFilterValueAccessor accessor = (ColumnRangeFilterValueAccessor) filterValueAccessors.get(i);
int columnIndex = accessor.getColumnIndex();
long normalizedValue;
if (pageZeroReader.isValidColumn(columnIndex)) {
- int filterOffset = pageZeroReader.getColumnFilterOffset(columnIndex);
- normalizedValue = accessor.isMin() ? pageZeroReader.getLong(filterOffset)
- : pageZeroReader.getLong(filterOffset + Long.BYTES);
+ normalizedValue = accessor.isMin() ? pageZeroReader.getColumnFilterMin(columnIndex)
+ : pageZeroReader.getColumnFilterMax(columnIndex);
} else {
// Column is missing
normalizedValue = accessor.isMin() ? Long.MAX_VALUE : Long.MIN_VALUE;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index 53a6375..46dae1b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -19,7 +19,6 @@
package org.apache.asterix.column.operation.lsm.flush;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.BitSet;
import org.apache.asterix.column.values.IColumnValuesWriter;
@@ -29,6 +28,8 @@
import org.apache.asterix.column.zero.PageZeroWriterFlavorSelector;
import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.SparseColumnMultiPageZeroWriter;
import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
import org.apache.commons.lang3.mutable.Mutable;
@@ -144,18 +145,37 @@
}
@Override
- public int getColumnOccupiedSpace(boolean includeCurrentTupleColumns) {
- int presentColumns = transformerForCurrentTuple.getNumberOfVisitedColumnsInBatch();
+ public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, boolean includeCurrentTupleColumns,
+ boolean adaptive) {
int totalNumberOfColumns = getAbsoluteNumberOfColumns(includeCurrentTupleColumns);
+ totalNumberOfColumns = Math.min(totalNumberOfColumns, maxColumnsInPageZerothSegment);
+
+ int spaceOccupiedByDefaultWriter = DefaultColumnMultiPageZeroWriter.EXTENDED_HEADER_SIZE + totalNumberOfColumns
+ * (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+
+ if (!adaptive) {
+ // go for default multipage writer
+ return spaceOccupiedByDefaultWriter;
+ }
+
+ // Maximum space occupied by the columns = maxColumnsInPageZerothSegment * (offset + filter size)
+ int spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
+ pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
+ spaceOccupiedBySparseWriter, adaptive);
+
+ return Math.min(spaceOccupiedBySparseWriter, spaceOccupiedByDefaultWriter);
+ }
+
+ private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment) {
+ int presentColumns = transformerForCurrentTuple.getNumberOfVisitedColumnsInBatch();
+ int numberOfPagesRequired = (int) Math.ceil(
+ (double) (presentColumns - maxColumnsInPageZerothSegment) / IColumnPageZeroWriter.MIN_COLUMN_SPACE);
+ int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfPagesRequired);
+ presentColumns = Math.min(presentColumns, maxColumnsInPageZerothSegment);
// space occupied by the sparse writer
- int spaceOccupiedBySparseWriter = presentColumns
+ return headerSpace + presentColumns
* (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
- int spaceOccupiedByDefaultWriter = totalNumberOfColumns
- * (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
- pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
- spaceOccupiedBySparseWriter);
- return Math.min(spaceOccupiedBySparseWriter, spaceOccupiedByDefaultWriter);
}
@Override
@@ -193,10 +213,9 @@
}
@Override
- public final int flush(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter) throws HyracksDataException {
+ public final int flush(IColumnPageZeroWriter pageZeroWriter) throws HyracksDataException {
int numberOfColumns = getAbsoluteNumberOfColumns(false);
finalizer.finalizeBatchColumns(columnMetadata, presentColumnsIndexes, pageZeroWriter);
- writer.setPageZeroWriter(pageZero, pageZeroWriter, toIndexArray(presentColumnsIndexes), numberOfColumns);
//assertion logging
int presentColumnsCount = presentColumnsIndexes.cardinality();
@@ -207,6 +226,7 @@
beforeTransformColumnCount, currentTupleColumnsCount, presentColumnsCount);
}
+ writer.setPageZeroWriter(pageZeroWriter, toIndexArray(presentColumnsIndexes), numberOfColumns);
return finalizer.finalizeBatch(writer);
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index 466a0cf..293f764 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -18,12 +18,12 @@
*/
package org.apache.asterix.column.operation.lsm.merge;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.util.BitSet;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleWriter;
import org.apache.asterix.column.tuple.MergeColumnTupleReference;
import org.apache.asterix.column.util.RunLengthIntArray;
import org.apache.asterix.column.values.IColumnValuesReader;
@@ -32,6 +32,8 @@
import org.apache.asterix.column.zero.PageZeroWriterFlavorSelector;
import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.SparseColumnMultiPageZeroWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
@@ -44,8 +46,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-
public class MergeColumnTupleWriter extends AbstractColumnTupleWriter {
private final MergeColumnWriteMetadata columnMetadata;
private final int maxLeafNodeSize;
@@ -56,7 +56,7 @@
private final PriorityQueue<IColumnValuesWriter> orderedColumns;
private final ColumnBatchWriter writer;
private final IColumnPageZeroWriterFlavorSelector pageZeroWriterFlavorSelector;
- protected final IntOpenHashSet presentColumnsIndexes;
+ protected final BitSet presentColumnsIndexes;
private final int maxNumberOfTuples;
private int primaryKeysEstimatedSize;
private int numberOfAntiMatter;
@@ -67,7 +67,7 @@
this.columnMetadata = columnMetadata;
this.pageZeroWriterFlavorSelector = new PageZeroWriterFlavorSelector();
this.maxLeafNodeSize = maxLeafNodeSize;
- this.presentColumnsIndexes = new IntOpenHashSet();
+ this.presentColumnsIndexes = new BitSet();
List<IColumnTupleIterator> componentsTuplesList = columnMetadata.getComponentsTuples();
this.componentsTuples = new MergeColumnTupleReference[componentsTuplesList.size()];
int totalLength = 0;
@@ -155,52 +155,78 @@
}
@Override
- public int getColumnOccupiedSpace(boolean includeCurrentTupleColumns) {
- int presentColumns = presentColumnsIndexes.size();
+ public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, boolean includeCurrentTupleColumns,
+ boolean adaptive) {
int totalNumberOfColumns = getAbsoluteNumberOfColumns(includeCurrentTupleColumns);
+ totalNumberOfColumns = Math.min(totalNumberOfColumns, maxColumnsInPageZerothSegment);
+
+ int spaceOccupiedByDefaultWriter = DefaultColumnMultiPageZeroWriter.EXTENDED_HEADER_SIZE + totalNumberOfColumns
+ * (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+
+ if (!adaptive) {
+ // go for default multipage writer
+ return spaceOccupiedByDefaultWriter;
+ }
// space occupied by the sparse writer
- int spaceOccupiedBySparseWriter = presentColumns
- * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
- int spaceOccupiedByDefaultWriter = totalNumberOfColumns
- * (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+ int spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
- spaceOccupiedBySparseWriter);
+ spaceOccupiedBySparseWriter, adaptive);
return Math.min(spaceOccupiedBySparseWriter, spaceOccupiedByDefaultWriter);
}
+ private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment) {
+ int presentColumns = presentColumnsIndexes.cardinality();
+ int numberOfPagesRequired = (int) Math.ceil(
+ (double) (presentColumns - maxColumnsInPageZerothSegment) / IColumnPageZeroWriter.MIN_COLUMN_SPACE);
+ int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfPagesRequired);
+ presentColumns = Math.min(presentColumns, maxColumnsInPageZerothSegment);
+
+ // space occupied by the sparse writer
+ return headerSpace + presentColumns
+ * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+ }
+
@Override
- public int flush(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter) throws HyracksDataException {
+ public int flush(IColumnPageZeroWriter pageZeroWriter) throws HyracksDataException {
// here the numberOfColumns is the total number of columns present in the LSM Index (across all disk components)
// Hence, a merge will fail if union(NumberOfColumns(D1) + NumberOfColumns(D2) + ... + NumberOfColumns(DN)) >
// pageZero space, and since the merged page contains this many number of columns, the first flush will fail.
int numberOfColumns = columnMetadata.getNumberOfColumns();
int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
+
+ // If writtenComponents is not empty, process non-key columns
if (writtenComponents.getSize() > 0) {
writeNonKeyColumns();
writtenComponents.reset();
}
- for (int columnIndex : presentColumnsIndexes) {
+ // Iterate over the BitSet (presentColumnsIndexes) to get the indexes of the set bits
+ for (int columnIndex = presentColumnsIndexes.nextSetBit(0); columnIndex >= 0; columnIndex =
+ presentColumnsIndexes.nextSetBit(columnIndex + 1)) {
if (columnIndex < numberOfPrimaryKeys) {
- continue;
+ continue; // Skip primary key columns
}
orderedColumns.add(columnMetadata.getWriter(columnIndex));
}
- // reset pageZeroWriter based on the writer
- writer.setPageZeroWriter(pageZero, pageZeroWriter, getPresentColumnsIndexesArray(), numberOfColumns);
+ // Reset pageZeroWriter based on the writer
+ writer.setPageZeroWriter(pageZeroWriter, toIndexArray(presentColumnsIndexes), numberOfColumns);
+
+ // Write primary key columns
writer.writePrimaryKeyColumns(primaryKeyWriters);
+
+ // Write the other columns and get the total length
int totalLength = writer.writeColumns(orderedColumns);
+ // Reset numberOfAntiMatter (assuming this is part of the logic)
numberOfAntiMatter = 0;
+
return totalLength;
}
- public int[] getPresentColumnsIndexesArray() {
- int[] sortedIndexes = presentColumnsIndexes.toIntArray();
- Arrays.sort(sortedIndexes);
- return sortedIndexes;
+ public static int[] toIndexArray(BitSet bitSet) {
+ return FlushColumnTupleWriter.toIndexArray(bitSet);
}
@Override
@@ -230,14 +256,17 @@
for (int i = 0; i < writtenComponents.getNumberOfBlocks(); i++) {
int componentIndex = writtenComponents.getBlockValue(i);
if (componentIndex < 0) {
- //Skip writing values of deleted tuples
+ // Skip writing values of deleted tuples
componentIndex = clearAntimatterIndicator(componentIndex);
skipReaders(componentIndex, writtenComponents.getBlockSize(i));
continue;
}
MergeColumnTupleReference componentTuple = componentsTuples[componentIndex];
int count = writtenComponents.getBlockSize(i);
- for (int columnIndex : presentColumnsIndexes) {
+
+ // Iterate over the set bits in presentColumnsIndexes
+ for (int columnIndex = presentColumnsIndexes.nextSetBit(0); columnIndex >= 0; columnIndex =
+ presentColumnsIndexes.nextSetBit(columnIndex + 1)) {
if (columnIndex < columnMetadata.getNumberOfPrimaryKeys()) {
continue;
}
@@ -263,7 +292,9 @@
private void skipReaders(int componentIndex, int count) throws HyracksDataException {
MergeColumnTupleReference componentTuple = componentsTuples[componentIndex];
try {
- for (int columnIndex : presentColumnsIndexes) {
+ // Iterate over the set bits in presentColumnsIndexes
+ for (int columnIndex = presentColumnsIndexes.nextSetBit(0); columnIndex >= 0; columnIndex =
+ presentColumnsIndexes.nextSetBit(columnIndex + 1)) {
if (columnIndex < columnMetadata.getNumberOfPrimaryKeys()) {
continue;
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
index 0c9c816..eb12a27 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.column.tuple;
+import java.util.BitSet;
+
import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
import org.apache.asterix.column.operation.lsm.merge.IEndOfPageCallBack;
import org.apache.asterix.column.operation.lsm.merge.MergeColumnReadMetadata;
@@ -29,15 +31,13 @@
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-
public final class MergeColumnTupleReference extends AbstractAsterixColumnTupleReference {
// NoOP callback is for empty pages only
private static final IEndOfPageCallBack EMPTY_PAGE_CALLBACK = createNoOpCallBack();
private final IColumnValuesReader[] columnReaders;
private int skipCount;
private IEndOfPageCallBack endOfPageCallBack;
- private IntOpenHashSet presentColumnIndexes;
+ private BitSet presentColumnIndexes;
private int mergingLength;
public MergeColumnTupleReference(int componentIndex, ColumnBTreeReadLeafFrame frame,
@@ -155,7 +155,7 @@
};
}
- public void setColumnIndexes(IntOpenHashSet presentColumnsIndexes) {
+ public void setColumnIndexes(BitSet presentColumnsIndexes) {
this.presentColumnIndexes = presentColumnsIndexes;
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
index 45d0008..ade1d38 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.column.values;
-import java.nio.ByteBuffer;
import java.util.PriorityQueue;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -41,13 +40,12 @@
* abstraction that supports different page zero layouts (default vs sparse).
* The writer will be used to manage column offsets, filters, and primary key storage.
*
- * @param pageZero The page zero buffer where metadata will be written
* @param pageZeroWriter The writer implementation for page zero operations
* @param presentColumnsIndexes Array of column indexes that contain data in this batch
* @param numberOfColumns Total number of columns in the schema
*/
- void setPageZeroWriter(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter, int[] presentColumnsIndexes,
- int numberOfColumns);
+ void setPageZeroWriter(IColumnPageZeroWriter pageZeroWriter, int[] presentColumnsIndexes, int numberOfColumns)
+ throws HyracksDataException;
/**
* Writes the primary keys' values to Page0
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
index 1a767a9..e1bf8e3 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.column.values.writer;
-import java.nio.ByteBuffer;
import java.util.PriorityQueue;
import org.apache.asterix.column.bytes.stream.out.MultiPersistentBufferBytesOutputStream;
@@ -61,17 +60,16 @@
* This method replaces the direct page zero buffer manipulation with a more abstracted approach,
* which allows for different page zero layouts (default or sparse).
*
- * @param pageZero The page zero buffer to be used
* @param pageZeroWriter The writer implementation for page zero operations
* @param presentColumnsIndexes Array containing the indexes of columns present in this batch
* @param numberOfColumns Total number of columns in the schema
*/
- public void setPageZeroWriter(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter,
- int[] presentColumnsIndexes, int numberOfColumns) {
+ public void setPageZeroWriter(IColumnPageZeroWriter pageZeroWriter, int[] presentColumnsIndexes,
+ int numberOfColumns) throws HyracksDataException {
this.pageZeroWriter = pageZeroWriter;
- pageZeroWriter.reset(pageZero, presentColumnsIndexes, numberOfColumns);
+ pageZeroWriter.resetBasedOnColumns(presentColumnsIndexes, numberOfColumns);
pageZeroWriter.allocateColumns();
- nonKeyColumnStartOffset = pageZeroWriter.getPageZeroBuffer().capacity();
+ nonKeyColumnStartOffset = pageZeroWriter.getPageZeroBufferCapacity();
}
@Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
index da7c9f2..c4f9bd5 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
@@ -19,12 +19,19 @@
package org.apache.asterix.column.zero;
import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter.DEFAULT_WRITER_FLAG;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter.MULTI_PAGE_DEFAULT_WRITER_FLAG;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter.MULTI_PAGE_SPARSE_WRITER_FLAG;
import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter.SPARSE_WRITER_FLAG;
import org.apache.asterix.column.zero.readers.DefaultColumnPageZeroReader;
import org.apache.asterix.column.zero.readers.SparseColumnPageZeroReader;
import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroReader;
+import org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter;
+import org.apache.asterix.column.zero.writers.multipage.SparseColumnMultiPageZeroReader;
+import org.apache.asterix.column.zero.writers.multipage.SparseColumnMultiPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroReader;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriterFlavorSelector;
@@ -43,7 +50,7 @@
*/
public class PageZeroWriterFlavorSelector implements IColumnPageZeroWriterFlavorSelector {
// Flag indicating which writer type is currently selected (DEFAULT_WRITER_FLAG=default, SPARSE_WRITER_FLAG=sparse)
- protected byte writerFlag = DEFAULT_WRITER_FLAG;
+ protected byte writerFlag = MULTI_PAGE_DEFAULT_WRITER_FLAG;
// Cache of writer instances to avoid repeated object creation
private final Byte2ObjectArrayMap<IColumnPageZeroWriter> writers;
@@ -65,13 +72,19 @@
* @param spaceOccupiedBySparseWriter Space in bytes required by the sparse writer
*/
@Override
- public void switchPageZeroWriterIfNeeded(int spaceOccupiedByDefaultWriter, int spaceOccupiedBySparseWriter) {
+ public void switchPageZeroWriterIfNeeded(int spaceOccupiedByDefaultWriter, int spaceOccupiedBySparseWriter,
+ boolean adaptive) {
+ if (!adaptive) {
+ // If not adaptive, always use the default writer
+ writerFlag = MULTI_PAGE_DEFAULT_WRITER_FLAG;
+ return;
+ }
if (spaceOccupiedByDefaultWriter <= spaceOccupiedBySparseWriter) {
// Default writer is more space-efficient (or equal), use it
- writerFlag = DEFAULT_WRITER_FLAG;
+ writerFlag = MULTI_PAGE_DEFAULT_WRITER_FLAG;
} else {
// Sparse writer is more space-efficient, use it
- writerFlag = SPARSE_WRITER_FLAG;
+ writerFlag = MULTI_PAGE_SPARSE_WRITER_FLAG;
}
}
@@ -83,12 +96,15 @@
* @throws IllegalStateException if an unsupported writer flag is encountered
*/
@Override
- public IColumnPageZeroWriter getPageZeroWriter() {
- return switch (writerFlag) {
- case DEFAULT_WRITER_FLAG -> writers.computeIfAbsent(DEFAULT_WRITER_FLAG, k -> new DefaultColumnPageZeroWriter());
- case SPARSE_WRITER_FLAG -> writers.computeIfAbsent(SPARSE_WRITER_FLAG, k -> new SparseColumnPageZeroWriter());
- default -> throw new IllegalStateException("Unsupported writer flag: " + writerFlag);
- };
+ public IColumnPageZeroWriter getPageZeroWriter(IColumnWriteMultiPageOp multiPageOpRef, int zerothSegmentMaxColumns,
+ int bufferCapacity) {
+ return switch (writerFlag) {
+ case DEFAULT_WRITER_FLAG -> writers.computeIfAbsent(DEFAULT_WRITER_FLAG, k -> new DefaultColumnPageZeroWriter());
+ case SPARSE_WRITER_FLAG -> writers.computeIfAbsent(SPARSE_WRITER_FLAG, k -> new SparseColumnPageZeroWriter());
+ case MULTI_PAGE_DEFAULT_WRITER_FLAG -> writers.computeIfAbsent(MULTI_PAGE_DEFAULT_WRITER_FLAG, k -> new DefaultColumnMultiPageZeroWriter(multiPageOpRef, zerothSegmentMaxColumns, bufferCapacity));
+ case MULTI_PAGE_SPARSE_WRITER_FLAG -> writers.computeIfAbsent(MULTI_PAGE_SPARSE_WRITER_FLAG, k -> new SparseColumnMultiPageZeroWriter(multiPageOpRef, zerothSegmentMaxColumns, bufferCapacity));
+ default -> throw new IllegalStateException("Unsupported writer flag: " + writerFlag);
+ };
}
/**
@@ -101,11 +117,13 @@
* @throws IllegalStateException if an unsupported reader flag is encountered
*/
@Override
- public IColumnPageZeroReader createPageZeroReader(byte flag) {
- return switch (flag) {
- case DEFAULT_WRITER_FLAG -> readers.computeIfAbsent(DEFAULT_WRITER_FLAG, k -> new DefaultColumnPageZeroReader());
- case SPARSE_WRITER_FLAG -> readers.computeIfAbsent(SPARSE_WRITER_FLAG, k -> new SparseColumnPageZeroReader());
- default -> throw new IllegalStateException("Unsupported reader flag: " + flag);
- };
+ public IColumnPageZeroReader createPageZeroReader(byte flag, int bufferCapacity) {
+ return switch (flag) {
+ case DEFAULT_WRITER_FLAG -> readers.computeIfAbsent(DEFAULT_WRITER_FLAG, k -> new DefaultColumnPageZeroReader());
+ case SPARSE_WRITER_FLAG -> readers.computeIfAbsent(SPARSE_WRITER_FLAG, k -> new SparseColumnPageZeroReader());
+ case MULTI_PAGE_DEFAULT_WRITER_FLAG -> readers.computeIfAbsent(MULTI_PAGE_DEFAULT_WRITER_FLAG, k -> new DefaultColumnMultiPageZeroReader(bufferCapacity));
+ case MULTI_PAGE_SPARSE_WRITER_FLAG -> readers.computeIfAbsent(MULTI_PAGE_SPARSE_WRITER_FLAG, k -> new SparseColumnMultiPageZeroReader(bufferCapacity));
+ default -> throw new IllegalStateException("Unsupported reader flag: " + flag);
+ };
}
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
index b643fd6..1692e70 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.column.zero.readers;
-import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NEXT_LEAF_OFFSET;
@@ -27,48 +26,67 @@
import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
import java.nio.ByteBuffer;
+import java.util.BitSet;
import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroReader;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-
public class DefaultColumnPageZeroReader implements IColumnPageZeroReader {
protected ByteBuffer pageZeroBuf;
+ protected BitSet pageZeroSegmentsPages;
+ protected int numberOfPresentColumns;
+ protected int headerSize;
+
+ public DefaultColumnPageZeroReader() {
+ this.pageZeroSegmentsPages = new BitSet();
+ }
@Override
- public void reset(ByteBuffer pageZeroBuf) {
+ public void reset(ByteBuffer pageZeroBuf, int headerSize) {
this.pageZeroBuf = pageZeroBuf;
+ this.numberOfPresentColumns = pageZeroBuf.getInt(NUMBER_OF_COLUMNS_OFFSET);
+ this.headerSize = headerSize;
+ }
+
+ public void reset(ByteBuffer pageZeroBuf, int numberOfPresentColumns, int headerSize) {
+ this.pageZeroBuf = pageZeroBuf;
+ this.numberOfPresentColumns = numberOfPresentColumns;
+ this.headerSize = headerSize;
}
@Override
public int getColumnOffset(int columnIndex) {
- return pageZeroBuf.getInt(HEADER_SIZE + columnIndex * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE);
+ return pageZeroBuf.getInt(headerSize + columnIndex * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE);
}
- @Override
- public int getColumnFilterOffset(int columnIndex) {
- int columnsOffsetEnd =
- HEADER_SIZE + getNumberOfPresentColumns() * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ protected int getColumnFilterOffset(int columnIndex) {
+ int columnsOffsetEnd = headerSize + numberOfPresentColumns * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
return columnsOffsetEnd + columnIndex * DefaultColumnPageZeroWriter.FILTER_SIZE;
}
@Override
- public long getLong(int offset) {
- return pageZeroBuf.getLong(offset);
+ public long getColumnFilterMin(int columnIndex) {
+ int filterOffset = getColumnFilterOffset(columnIndex);
+ return pageZeroBuf.getLong(filterOffset);
+ }
+
+ @Override
+ public long getColumnFilterMax(int columnIndex) {
+ int filterOffset = getColumnFilterOffset(columnIndex);
+ return pageZeroBuf.getLong(filterOffset + Long.BYTES);
}
@Override
public void skipFilters() {
- int filterEndOffset = getColumnFilterOffset(getNumberOfPresentColumns());
+ int filterEndOffset = getColumnFilterOffset(numberOfPresentColumns);
pageZeroBuf.position(filterEndOffset);
}
@Override
public void skipColumnOffsets() {
- int columnEndOffset =
- HEADER_SIZE + getNumberOfPresentColumns() * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ int columnEndOffset = headerSize + numberOfPresentColumns * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
pageZeroBuf.position(columnEndOffset);
}
@@ -78,6 +96,11 @@
}
@Override
+ public int getNumberOfPageZeroSegments() {
+ return 1;
+ }
+
+ @Override
public int getLeftMostKeyOffset() {
return pageZeroBuf.getInt(LEFT_MOST_KEY_OFFSET);
}
@@ -114,16 +137,13 @@
@Override
public boolean isValidColumn(int columnIndex) {
- int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
- return relativeColumnIndex < getNumberOfPresentColumns();
+ return columnIndex < numberOfPresentColumns;
}
@Override
- public void getAllColumns(IntOpenHashSet presentColumns) {
- int numberOfColumns = getNumberOfPresentColumns();
- for (int i = 0; i < numberOfColumns; i++) {
- presentColumns.add(i);
- }
+ public void getAllColumns(BitSet presentColumns) {
+ int numberOfColumns = numberOfPresentColumns;
+ presentColumns.set(0, numberOfColumns);
}
@Override
@@ -133,11 +153,33 @@
@Override
public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
- int columnOffsetStart = HEADER_SIZE;
+ int columnOffsetStart = headerSize;
for (int i = 0; i < offsetColumnIndexPairs.length; i++) {
int offset = pageZeroBuf.getInt(columnOffsetStart);
offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
columnOffsetStart += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
}
}
+
+ @Override
+ public BitSet getPageZeroSegmentsPages() {
+ return pageZeroSegmentsPages;
+ }
+
+ @Override
+ public int getHeaderSize() {
+ return headerSize;
+ }
+
+ @Override
+ public void resetStream(IColumnBufferProvider pageZeroSegmentBufferProvider) {
+ throw new UnsupportedOperationException("Not supported for DefaultColumnPageZeroReader");
+ }
+
+ @Override
+ public BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll) {
+ pageZeroSegmentsPages.clear();
+ pageZeroSegmentsPages.set(0);
+ return pageZeroSegmentsPages;
+ }
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
index 44d2369..3b4fdc4 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
@@ -18,16 +18,14 @@
*/
package org.apache.asterix.column.zero.readers;
-import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
-
import java.nio.ByteBuffer;
+import java.util.BitSet;
import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
public class SparseColumnPageZeroReader extends DefaultColumnPageZeroReader {
private final Int2IntOpenHashMap columnIndexToRelativeColumnIndex;
@@ -38,8 +36,8 @@
}
@Override
- public void reset(ByteBuffer pageZeroBuf) {
- super.reset(pageZeroBuf);
+ public void reset(ByteBuffer pageZeroBuf, int headerSize) {
+ super.reset(pageZeroBuf, headerSize);
columnIndexToRelativeColumnIndex.clear();
}
@@ -47,28 +45,26 @@
public int getColumnOffset(int columnIndex) {
int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
return pageZeroBuf.getInt(
- HEADER_SIZE + relativeColumnIndex * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + Integer.BYTES);
+ headerSize + relativeColumnIndex * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + Integer.BYTES);
}
@Override
public int getColumnFilterOffset(int columnIndex) {
int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
- int columnsOffsetEnd =
- HEADER_SIZE + getNumberOfPresentColumns() * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ int columnsOffsetEnd = headerSize + numberOfPresentColumns * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
return columnsOffsetEnd + relativeColumnIndex * DefaultColumnPageZeroWriter.FILTER_SIZE;
}
@Override
public void skipFilters() {
- int filterEndOffset = HEADER_SIZE + getNumberOfPresentColumns()
- * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+ int filterEndOffset = headerSize + numberOfPresentColumns
+ * (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
pageZeroBuf.position(filterEndOffset);
}
@Override
public void skipColumnOffsets() {
- int columnsOffsetEnd =
- HEADER_SIZE + getNumberOfPresentColumns() * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ int columnsOffsetEnd = headerSize + numberOfPresentColumns * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
pageZeroBuf.position(columnsOffsetEnd);
}
@@ -87,7 +83,7 @@
return 0;
}
- int totalColumns = getNumberOfPresentColumns();
+ int totalColumns = numberOfPresentColumns;
int lastColumnIndex = getColumnIndex(totalColumns - 1);
int lastColumn = pageZeroBuf.getInt(lastColumnIndex);
if (lastColumn == columnIndex) {
@@ -115,7 +111,7 @@
}
private int getColumnIndex(int index) {
- return HEADER_SIZE + index * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ return headerSize + index * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
}
@Override
@@ -125,11 +121,17 @@
}
@Override
- public void getAllColumns(IntOpenHashSet presentColumns) {
- int columnIndex = getColumnIndex(0);
- for (int i = 0; i < getNumberOfPresentColumns(); i++) {
+ public void getAllColumns(BitSet presentColumns) {
+ if (numberOfPresentColumns == 0) {
+ return;
+ }
+
+ int columnIndex = headerSize;
+ int limit = columnIndex + numberOfPresentColumns * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+
+ while (columnIndex < limit) {
int column = pageZeroBuf.getInt(columnIndex);
- presentColumns.add(column);
+ presentColumns.set(column);
columnIndex += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
}
}
@@ -137,7 +139,7 @@
@Override
public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
int columnIndex = getColumnIndex(0);
- for (int i = 0; i < getNumberOfPresentColumns(); i++) {
+ for (int i = 0; i < numberOfPresentColumns; i++) {
int column = pageZeroBuf.getInt(columnIndex);
int offset = pageZeroBuf.getInt(columnIndex + SparseColumnPageZeroWriter.COLUMN_INDEX_SIZE);
offsetColumnIndexPairs[i] = IntPairUtil.of(offset, column);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/DefaultColumnPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/DefaultColumnPageZeroWriter.java
index 3d7aa93..e7d4e66 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/DefaultColumnPageZeroWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/DefaultColumnPageZeroWriter.java
@@ -18,21 +18,33 @@
*/
package org.apache.asterix.column.zero.writers;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.FLAG_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.RIGHT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.SIZE_OF_COLUMNS_OFFSETS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
+
import java.nio.ByteBuffer;
import java.util.BitSet;
import org.apache.asterix.column.bytes.stream.out.ByteBufferOutputStream;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IValuesWriter;
/**
* Default implementation of page zero writer that allocates space for all columns in the schema.
- *
+ * <p>
* This writer uses a fixed layout where every column in the schema has a reserved slot,
* regardless of whether data is present for that column. This approach is optimal for
* dense datasets where most columns contain data.
- *
+ * <p>
* Memory layout in page zero:
* 1. Column offsets: 4 bytes per column (numberOfColumns * 4 bytes)
* 2. Column filters: 16 bytes per column (numberOfColumns * 16 bytes) - min/max values
@@ -44,24 +56,26 @@
/** Size in bytes for storing column filter (min + max values) */
public static final int FILTER_SIZE = Long.BYTES * 2; // min and max
- private final ByteBufferOutputStream primaryKeys;
- private ByteBuffer pageZero;
+ protected final ByteBufferOutputStream primaryKeys;
+ protected ByteBuffer pageZero;
+ protected int headerSize;
private int numberOfColumns;
// Offset positions within page zero buffer
- private int primaryKeysOffset; // Where primary key data starts
- private int columnsOffset; // Where column offset array starts
- private int filtersOffset; // Where column filter array starts
+ protected int primaryKeysOffset; // Where primary key data starts
+ protected int columnsOffset; // Where column offset array starts
+ protected int filtersOffset; // Where column filter array starts
public DefaultColumnPageZeroWriter() {
primaryKeys = new ByteBufferOutputStream();
}
@Override
- public void reset(ByteBuffer pageZeroBuf, int[] presentColumns, int numberOfColumns) {
- this.pageZero = pageZeroBuf;
+ public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns, int headerSize) {
this.numberOfColumns = numberOfColumns;
- this.primaryKeysOffset = pageZeroBuf.position();
+ primaryKeysOffset = headerSize;
+ this.headerSize = headerSize;
+ pageZero.position(headerSize);
}
@Override
@@ -71,7 +85,7 @@
/**
* Allocates space in page zero for all column metadata.
- *
+ * <p>
* The allocation strategy reserves space for all columns in the schema:
* - Column offsets: Fixed array of 4-byte integers
* - Column filters: Fixed array of 16-byte min/max pairs
@@ -140,6 +154,48 @@
}
@Override
+ public void setPageZero(ByteBuffer pageZero) {
+ // this method is used to set the pageZero buffer
+ // only caller is the MultiColumnPageZeroWriter
+ this.pageZero = pageZero;
+ }
+
+ public void flush(ByteBuffer buf, int numberOfTuples, ITupleReference minKey, ITupleReference maxKey,
+ AbstractColumnTupleWriter columnWriter, ITreeIndexTupleWriter rowTupleWriter) throws HyracksDataException {
+ this.pageZero = buf;
+ // Prepare the space for writing the columns' information such as the primary keys
+ pageZero.position(HEADER_SIZE);
+ this.primaryKeysOffset = buf.position();
+ // Flush the columns to persistence pages and write the length of the mega leaf node in pageZero
+ pageZero.putInt(MEGA_LEAF_NODE_LENGTH, columnWriter.flush(this));
+ // Write min and max keys
+ int offset = buf.position();
+ buf.putInt(LEFT_MOST_KEY_OFFSET, offset);
+ offset += rowTupleWriter.writeTuple(minKey, buf.array(), offset);
+ buf.putInt(RIGHT_MOST_KEY_OFFSET, offset);
+ rowTupleWriter.writeTuple(maxKey, buf.array(), offset);
+
+ // Write page information
+ buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
+ buf.put(FLAG_OFFSET, flagCode());
+ buf.putInt(NUMBER_OF_COLUMNS_OFFSET, getNumberOfColumns());
+ buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, getColumnOffsetsSize());
+
+ // reset the collected meta info
+ columnWriter.reset();
+ }
+
+ public void flush(ByteBuffer buf, int numberOfTuples, AbstractColumnTupleWriter writer)
+ throws HyracksDataException {
+ this.pageZero = buf;
+ pageZero.position(HEADER_SIZE);
+ this.primaryKeysOffset = buf.position();
+ pageZero.putInt(MEGA_LEAF_NODE_LENGTH, writer.flush(this));
+ buf.putInt(NUMBER_OF_COLUMNS_OFFSET, getNumberOfColumns());
+ buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
+ }
+
+ @Override
public int getNumberOfColumns() {
return numberOfColumns;
}
@@ -158,8 +214,8 @@
}
@Override
- public ByteBuffer getPageZeroBuffer() {
- return pageZero;
+ public int getPageZeroBufferCapacity() {
+ return pageZero.capacity();
}
/**
@@ -177,4 +233,9 @@
public int getColumnOffsetsSize() {
return numberOfColumns * COLUMN_OFFSET_SIZE;
}
+
+ @Override
+ public int getHeaderSize() {
+ return headerSize;
+ }
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/SparseColumnPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/SparseColumnPageZeroWriter.java
index f74b575..4fc2550 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/SparseColumnPageZeroWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/SparseColumnPageZeroWriter.java
@@ -20,55 +20,53 @@
import static org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter.FILTER_SIZE;
-import java.nio.ByteBuffer;
import java.util.BitSet;
-import org.apache.asterix.column.bytes.stream.out.ByteBufferOutputStream;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IValuesWriter;
/**
* Sparse implementation of page zero writer that only allocates space for present columns.
- *
+ * <p>
* This writer optimizes space usage for sparse datasets by storing only the columns
* that actually contain data. Each column entry includes both the column index and
* its data offset, allowing for efficient lookup while minimizing space overhead.
- *
+ *<p>
* Memory layout in page zero:
* 1. Column entries: 8 bytes per present column (4 bytes index + 4 bytes offset)
* 2. Column filters: 16 bytes per present column (min/max values)
* 3. Primary key data: variable size, written sequentially
- *
+ * <p>
* This layout is particularly beneficial when the number of present columns is
* significantly smaller than the total schema size.
*/
-public class SparseColumnPageZeroWriter implements IColumnPageZeroWriter {
+public class SparseColumnPageZeroWriter extends DefaultColumnPageZeroWriter {
/** Size in bytes for storing a column index */
public static final int COLUMN_INDEX_SIZE = Integer.BYTES;
/** Size in bytes for storing a column entry (index + offset) */
public static final int COLUMN_OFFSET_SIZE = Integer.BYTES + COLUMN_INDEX_SIZE;
- private final ByteBufferOutputStream primaryKeys;
private int[] presentColumns;
private int numberOfPresentColumns;
- private ByteBuffer pageZero;
-
- // Offset positions within page zero buffer
- private int primaryKeysOffset; // Where primary key data starts
- private int columnsOffset; // Where column entries array starts
- private int filtersOffset; // Where column filter array starts
public SparseColumnPageZeroWriter() {
- primaryKeys = new ByteBufferOutputStream();
+ super();
}
@Override
- public void reset(ByteBuffer pageZeroBuf, int[] presentColumns, int numberOfColumns /* not being used */) {
- this.pageZero = pageZeroBuf;
+ public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns /* not being used */, int headerSize) {
this.presentColumns = presentColumns;
this.numberOfPresentColumns = presentColumns.length;
- this.primaryKeysOffset = pageZeroBuf.position();
+ this.primaryKeysOffset = headerSize;
+ this.headerSize = headerSize;
+ pageZero.position(headerSize);
+ }
+
+ public void resetInnerBasedOnColumns(int[] presentColumns, int numberOfPresentColumns, int headerSize) {
+ this.presentColumns = presentColumns;
+ this.numberOfPresentColumns = numberOfPresentColumns;
+ this.primaryKeysOffset = headerSize; // Reset primary keys offset for sparse layout
+ pageZero.position(headerSize);
}
@Override
@@ -159,9 +157,9 @@
* @param columnIndex The absolute column index to find
* @return the relative position within present columns, or -1 if not found
*/
- private int findColumnIndex(int columnIndex) {
+ public int findColumnIndex(int[] presentColumns, int numberOfPresentColumns, int columnIndex) {
int low = 0;
- int high = presentColumns.length - 1;
+ int high = numberOfPresentColumns - 1;
while (low <= high) {
int mid = (low + high) >>> 1;
int midVal = presentColumns[mid];
@@ -194,13 +192,8 @@
}
@Override
- public ByteBuffer getPageZeroBuffer() {
- return pageZero;
- }
-
- @Override
public int getNumberOfColumns() {
- return presentColumns.length;
+ return numberOfPresentColumns;
}
/**
@@ -215,7 +208,7 @@
*/
@Override
public int getRelativeColumnIndex(int columnIndex) {
- int columnRelativeIndex = findColumnIndex(columnIndex);
+ int columnRelativeIndex = findColumnIndex(presentColumns, numberOfPresentColumns, columnIndex);
if (columnRelativeIndex == -1) {
throw new IllegalStateException("Column index " + columnIndex + " does not exist in present columns.");
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/AbstractColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/AbstractColumnMultiPageZeroReader.java
new file mode 100644
index 0000000..3b05fc3
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/AbstractColumnMultiPageZeroReader.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers.multipage;
+
+import org.apache.asterix.column.bytes.stream.in.MultiPageZeroByteBuffersReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroReader;
+
+public abstract class AbstractColumnMultiPageZeroReader implements IColumnPageZeroReader {
+ protected MultiPageZeroByteBuffersReader segmentBuffers;
+
+ AbstractColumnMultiPageZeroReader() {
+ segmentBuffers = new MultiPageZeroByteBuffersReader();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
new file mode 100644
index 0000000..db800ae
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers.multipage;
+
+import static org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter.EXTENDED_HEADER_SIZE;
+import static org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter.MAX_COLUMNS_IN_ZEROTH_SEGMENT;
+import static org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter.NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NEXT_LEAF_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.RIGHT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.asterix.column.zero.readers.DefaultColumnPageZeroReader;
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+
+public class DefaultColumnMultiPageZeroReader extends AbstractColumnMultiPageZeroReader {
+ public static final int headerSize = DefaultColumnMultiPageZeroWriter.EXTENDED_HEADER_SIZE;
+ private final DefaultColumnPageZeroReader zerothSegmentReader;
+
+ private final int maxNumberOfColumnsInAPage;
+ private final BitSet pageZeroSegmentsPages;
+ private int zerothSegmentMaxColumns;
+ private int numberOfPageZeroSegments; // includes the zeroth segment
+ private ByteBuffer pageZeroBuf;
+
+ private final VoidPointable offsetPointable;
+
+ public DefaultColumnMultiPageZeroReader(int bufferCapacity) {
+ super();
+ zerothSegmentReader = new DefaultColumnPageZeroReader();
+ this.pageZeroSegmentsPages = new BitSet();
+ this.maxNumberOfColumnsInAPage = bufferCapacity
+ / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+ this.offsetPointable = new VoidPointable();
+ }
+
+ @Override
+ public void resetStream(IColumnBufferProvider pageZeroSegmentBufferProvider) throws HyracksDataException {
+ segmentBuffers.reset(pageZeroSegmentBufferProvider);
+ }
+
+ @Override
+ public void reset(ByteBuffer pageZeroBuf) {
+ this.pageZeroBuf = pageZeroBuf;
+ zerothSegmentMaxColumns = pageZeroBuf.getInt(MAX_COLUMNS_IN_ZEROTH_SEGMENT);
+ zerothSegmentReader.reset(pageZeroBuf, Math.min(zerothSegmentMaxColumns, getNumberOfPresentColumns()),
+ headerSize);
+ numberOfPageZeroSegments = pageZeroBuf.getInt(NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET);
+ }
+
+ @Override
+ public void reset(ByteBuffer pageZeroBuf, int headerSize) {
+ throw new UnsupportedOperationException("This method should not be called for multi-page readers.");
+ }
+
+ @Override
+ public int getColumnOffset(int columnIndex) throws HyracksDataException {
+ try {
+ if (columnIndex < zerothSegmentMaxColumns) {
+ return zerothSegmentReader.getColumnOffset(columnIndex);
+ } else {
+ int segmentIndex = (columnIndex - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage;
+ int columnIndexInRequiredSegment = (columnIndex - zerothSegmentMaxColumns) % maxNumberOfColumnsInAPage;
+ int segmentOffset = columnIndexInRequiredSegment * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset,
+ DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE);
+ return IntegerPointable.getInteger(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+ }
+ } catch (EOFException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public int getNumberOfPageZeroSegments() {
+ return numberOfPageZeroSegments;
+ }
+
+ @Override
+ public BitSet getPageZeroSegmentsPages() {
+ //If pageZeroSegmentsPages is null, it means that the CloudReadContext is not being used.
+ // which indicates all the segments are being read.
+ return pageZeroSegmentsPages;
+ }
+
+ @Override
+ public long getColumnFilterMin(int columnIndex) throws HyracksDataException {
+ try {
+ if (columnIndex < zerothSegmentMaxColumns) {
+ return zerothSegmentReader.getColumnFilterMin(columnIndex);
+ } else {
+ int segmentIndex = (columnIndex - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage;
+ int columnIndexInRequiredSegment = (columnIndex - zerothSegmentMaxColumns) % maxNumberOfColumnsInAPage;
+ int segmentOffset =
+ findNumberOfColumnsInSegment(segmentIndex) * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE
+ + columnIndexInRequiredSegment * DefaultColumnPageZeroWriter.FILTER_SIZE;
+ segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset, Long.BYTES);
+ return LongPointable.getLong(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+ }
+ } catch (EOFException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public long getColumnFilterMax(int columnIndex) throws HyracksDataException {
+ try {
+ if (columnIndex < zerothSegmentMaxColumns) {
+ return zerothSegmentReader.getColumnFilterMax(columnIndex);
+ } else {
+ int segmentIndex = (columnIndex - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage;
+ int columnIndexInRequiredSegment = (columnIndex - zerothSegmentMaxColumns) % maxNumberOfColumnsInAPage;
+ int segmentOffset =
+ findNumberOfColumnsInSegment(segmentIndex) * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE
+ + columnIndexInRequiredSegment * DefaultColumnPageZeroWriter.FILTER_SIZE;
+ segmentOffset += Long.BYTES; // Move to the max value in the filter
+ segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset, Long.BYTES);
+ return LongPointable.getLong(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+ }
+ } catch (EOFException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private int findNumberOfColumnsInSegment(int segmentIndex) {
+ // starts from 1st segment, not from 0th segment
+ if (segmentIndex == numberOfPageZeroSegments - 2) {
+ return getNumberOfPresentColumns() - zerothSegmentMaxColumns
+ - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage;
+ }
+ // For segments beyond the zeroth segment, we can have maximum number of columns in a page, except the last segment.
+ return maxNumberOfColumnsInAPage;
+ }
+
+ @Override
+ public void skipFilters() {
+ zerothSegmentReader.skipFilters();
+ }
+
+ @Override
+ public void skipColumnOffsets() {
+ zerothSegmentReader.skipColumnOffsets();
+ }
+
+ @Override
+ public int getTupleCount() {
+ return pageZeroBuf.getInt(TUPLE_COUNT_OFFSET);
+ }
+
+ @Override
+ public int getLeftMostKeyOffset() {
+ return pageZeroBuf.getInt(LEFT_MOST_KEY_OFFSET);
+ }
+
+ @Override
+ public int getRightMostKeyOffset() {
+ return pageZeroBuf.getInt(RIGHT_MOST_KEY_OFFSET);
+ }
+
+ @Override
+ public int getNumberOfPresentColumns() {
+ return pageZeroBuf.getInt(NUMBER_OF_COLUMNS_OFFSET);
+ }
+
+ @Override
+ public int getRelativeColumnIndex(int columnIndex) {
+ return columnIndex;
+ }
+
+ @Override
+ public int getNextLeaf() {
+ return pageZeroBuf.getInt(NEXT_LEAF_OFFSET);
+ }
+
+ @Override
+ public int getMegaLeafNodeLengthInBytes() {
+ return pageZeroBuf.getInt(MEGA_LEAF_NODE_LENGTH);
+ }
+
+ @Override
+ public int getPageZeroCapacity() {
+ return pageZeroBuf.capacity();
+ }
+
+ @Override
+ public boolean isValidColumn(int columnIndex) {
+ return columnIndex < getNumberOfPresentColumns();
+ }
+
+ @Override
+ public void getAllColumns(BitSet presentColumns) {
+ int numberOfColumns = getNumberOfPresentColumns();
+ presentColumns.set(0, numberOfColumns);
+ }
+
+ @Override
+ public ByteBuffer getPageZeroBuf() {
+ return pageZeroBuf;
+ }
+
+ @Override
+ public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+ int columnOffsetStart = headerSize;
+ for (int i = 0; i < Math.min(offsetColumnIndexPairs.length, zerothSegmentMaxColumns); i++) {
+ // search in the 0th segment
+ int offset = pageZeroBuf.getInt(columnOffsetStart);
+ offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
+ columnOffsetStart += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ }
+
+ if (offsetColumnIndexPairs.length > zerothSegmentMaxColumns) {
+ // read the rest of the columns from the segment stream
+ segmentBuffers.readOffset(offsetColumnIndexPairs, zerothSegmentMaxColumns, maxNumberOfColumnsInAPage);
+ }
+ }
+
+ @Override
+ public BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll) {
+ pageZeroSegmentsPages.clear();
+ // Not marking the zeroth segment
+ if (numberOfPageZeroSegments == 1 || markAll) {
+ // mark all segments as required
+ pageZeroSegmentsPages.set(1, numberOfPageZeroSegments);
+ } else {
+ // Iterate over the projected columns and mark the segments that contain them
+ int currentIndex = projectedColumns.nextSetBit(zerothSegmentMaxColumns);
+ while (currentIndex >= 0) {
+ int rangeEnd = projectedColumns.nextClearBit(currentIndex); // exclusive
+
+ int fromSegmentIndex = (currentIndex - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage + 1;
+ int toSegmentIndex = (rangeEnd - 1 - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage + 1;
+
+ if (fromSegmentIndex <= toSegmentIndex) {
+ pageZeroSegmentsPages.set(fromSegmentIndex, toSegmentIndex + 1); // inclusive range
+ }
+
+ currentIndex = projectedColumns.nextSetBit(rangeEnd);
+ }
+ }
+
+ return pageZeroSegmentsPages;
+ }
+
+ @Override
+ public int getHeaderSize() {
+ return EXTENDED_HEADER_SIZE;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
new file mode 100644
index 0000000..7f16d5e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers.multipage;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.FLAG_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.RIGHT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.SIZE_OF_COLUMNS_OFFSETS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.asterix.column.bytes.stream.out.MultiPersistentPageZeroBufferBytesOutputStream;
+import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IValuesWriter;
+
+/*
+[ PageZero Segment 0 ]
+──────────────────────────────────────────────────────────────────────────────
+| Headers |
+| ───────────────────────────────────────────────────────────────────────── |
+| TupleCountOffset |
+| MaxColumnsInZerothSegment |
+| LevelOffset |
+| NumberOfColumnsOffset |
+| LeftMostKeyOffset |
+| RightMostKeyOffset |
+| SizeOfColumnsOffsetsOffset |
+| MegaLeafNodeLength |
+| FlagOffset |
+| NextLeafOffset |
+| NumberOfPageSegments |
+| MaxColumnsInPageZeroSegment |
+
+| Min Primary Key |
+| Max Primary Key |
+| Primary Key Values |
+| [ offset₁, min₁, max₁ ] |
+| [ offset₂, min₂, max₂ ] |
+| [ offset₃, min₃, max₃ ] |
+| ... |
+
+[ PageZero Segment 1..N ]
+──────────────────────────────────────────────────────────────────────────────
+| Additional column metadata (same format) |
+*/
+public class DefaultColumnMultiPageZeroWriter implements IColumnPageZeroWriter {
+ // for storing max columns allowed in zeroth segment
+ public static final int NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET = HEADER_SIZE;
+ public static final int MAX_COLUMNS_IN_ZEROTH_SEGMENT = HEADER_SIZE + Integer.BYTES;
+ public static final int EXTENDED_HEADER_SIZE = MAX_COLUMNS_IN_ZEROTH_SEGMENT + Integer.BYTES;
+
+ private final MultiPersistentPageZeroBufferBytesOutputStream segments;
+ private final DefaultColumnPageZeroWriter zerothSegmentWriter;
+ // maximum number of columns that can be laid out in the zeroth segments
+ private final int zerothSegmentMaxColumns;
+ private final int maximumNumberOfColumnsInAPage; // this is the maximum number of columns that can be laid out in a page
+
+ private int numberOfColumns;
+ private int numberOfColumnInZerothSegment;
+ private int numberOfPageZeroSegments; // this includes the zeroth segment
+
+ public DefaultColumnMultiPageZeroWriter(IColumnWriteMultiPageOp multiPageOp, int zerothSegmentMaxColumns,
+ int bufferCapacity) {
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new MutableObject<>();
+ multiPageOpRef.setValue(multiPageOp);
+ segments = new MultiPersistentPageZeroBufferBytesOutputStream(multiPageOpRef); // should this be populated at reset?
+ this.zerothSegmentWriter = new DefaultColumnPageZeroWriter();
+ this.zerothSegmentMaxColumns = zerothSegmentMaxColumns;
+ this.maximumNumberOfColumnsInAPage = bufferCapacity
+ / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+ }
+
+ @Override
+ public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns) throws HyracksDataException {
+ this.numberOfColumns = numberOfColumns;
+ this.numberOfColumnInZerothSegment = Math.min(numberOfColumns, zerothSegmentMaxColumns);
+ this.numberOfPageZeroSegments = calculateNumberOfPageZeroSegments(numberOfColumns,
+ numberOfColumnInZerothSegment, maximumNumberOfColumnsInAPage);
+ zerothSegmentWriter.resetBasedOnColumns(presentColumns, numberOfColumnInZerothSegment, EXTENDED_HEADER_SIZE);
+ if (numberOfPageZeroSegments > 1) {
+ // these many buffers need to be allocated, to get contiguous pageIds
+ segments.reset(numberOfPageZeroSegments - 1);
+ }
+ }
+
+ @Override
+ public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns, int headerSize)
+ throws HyracksDataException {
+ throw new UnsupportedOperationException(
+ "resetBasedOnColumns with headerSize is not supported in multi-page zero writer");
+ }
+
+ private int calculateNumberOfPageZeroSegments(int numberOfColumns, int numberOfColumnInZerothSegment,
+ int maximumNumberOfColumnsInAPage) {
+ // calculate the number of segments required to store the columns
+ int numberOfColumnsBeyondZerothSegment = numberOfColumns - numberOfColumnInZerothSegment;
+ if (numberOfColumnsBeyondZerothSegment <= 0) {
+ return 1; // only zeroth segment is needed
+ }
+ return 1 + (int) Math.ceil((double) numberOfColumnsBeyondZerothSegment / maximumNumberOfColumnsInAPage);
+ }
+
+ @Override
+ public void allocateColumns() {
+ // allocate the zeroth segment columns
+ zerothSegmentWriter.allocateColumns();
+ // rest of the segments need not need to be allocated
+ // as those are full of columns
+ }
+
+ @Override
+ public void putColumnOffset(int columnIndex, int relativeColumnIndex, int offset) throws HyracksDataException {
+ try {
+ // for default writer, both columnIndex and relativeColumnIndex are the same
+ if (columnIndex < zerothSegmentMaxColumns) {
+ zerothSegmentWriter.putColumnOffset(columnIndex, relativeColumnIndex, offset);
+ } else {
+ // For columns beyond the zeroth segment, we need to write to the segments
+ int columnIndexInSegment = columnIndex - numberOfColumnInZerothSegment;
+ int requiredSegment = columnIndexInSegment / maximumNumberOfColumnsInAPage;
+ int columnIndexInRequiredSegment = columnIndexInSegment % maximumNumberOfColumnsInAPage;
+ int offsetInSegment = columnIndexInRequiredSegment * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ segments.writeInSegment(requiredSegment, offsetInSegment, offset);
+ }
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void putColumnFilter(int columnIndex, long normalizedMinValue, long normalizedMaxValue)
+ throws HyracksDataException {
+ try {
+ if (columnIndex < zerothSegmentMaxColumns) {
+ zerothSegmentWriter.putColumnFilter(columnIndex, normalizedMinValue, normalizedMaxValue);
+ } else {
+ // For columns beyond the zeroth segment, we need to write to the segments
+ int columnIndexInSegment = columnIndex - numberOfColumnInZerothSegment;
+ int requiredSegment = columnIndexInSegment / maximumNumberOfColumnsInAPage;
+ int columnIndexInRequiredSegment = columnIndexInSegment % maximumNumberOfColumnsInAPage;
+ int numberOfColumnsInSegment = findNumberOfColumnsInSegment(requiredSegment);
+ int segmentFilterOffset = numberOfColumnsInSegment * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ int offsetInSegment =
+ segmentFilterOffset + columnIndexInRequiredSegment * DefaultColumnPageZeroWriter.FILTER_SIZE;
+ segments.writeInSegment(requiredSegment, offsetInSegment, normalizedMinValue);
+ segments.writeInSegment(requiredSegment, offsetInSegment + Long.BYTES, normalizedMaxValue);
+ }
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private int findNumberOfColumnsInSegment(int segmentIndex) {
+ // starts from 1st segment, not from 0th segment
+ if (segmentIndex == numberOfPageZeroSegments - 2) {
+ return numberOfColumns - numberOfColumnInZerothSegment
+ - (numberOfPageZeroSegments - 2) * maximumNumberOfColumnsInAPage;
+ }
+ // For segments beyond the zeroth segment, we can have maximum number of columns in a page, except the last segment.
+ return maximumNumberOfColumnsInAPage;
+ }
+
+ @Override
+ public void writePrimaryKeyColumns(IValuesWriter[] primaryKeyWriters) throws HyracksDataException {
+ // primary key columns are always written to the zeroth segment
+ zerothSegmentWriter.writePrimaryKeyColumns(primaryKeyWriters);
+ }
+
+ @Override
+ public byte flagCode() {
+ return MULTI_PAGE_DEFAULT_WRITER_FLAG;
+ }
+
+ @Override
+ public void flush(ByteBuffer buf, int numberOfTuples, ITupleReference minKey, ITupleReference maxKey,
+ AbstractColumnTupleWriter columnWriter, ITreeIndexTupleWriter rowTupleWriter) throws HyracksDataException {
+ buf.position(EXTENDED_HEADER_SIZE);
+ zerothSegmentWriter.setPageZero(buf);
+ buf.putInt(MEGA_LEAF_NODE_LENGTH, columnWriter.flush(this));
+ // Write min and max keys
+ int offset = buf.position();
+ buf.putInt(LEFT_MOST_KEY_OFFSET, offset);
+ offset += rowTupleWriter.writeTuple(minKey, buf.array(), offset);
+ buf.putInt(RIGHT_MOST_KEY_OFFSET, offset);
+ rowTupleWriter.writeTuple(maxKey, buf.array(), offset);
+
+ // Write page information
+ buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
+ buf.put(FLAG_OFFSET, flagCode());
+ buf.putInt(NUMBER_OF_COLUMNS_OFFSET, getNumberOfColumns());
+ buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, getColumnOffsetsSize());
+ // write the number of segments
+ buf.putInt(NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET, numberOfPageZeroSegments);
+ // write the number of columns in the zeroth segment
+ buf.putInt(MAX_COLUMNS_IN_ZEROTH_SEGMENT, zerothSegmentMaxColumns);
+
+ // reset the collected meta info
+ segments.finish();
+ columnWriter.reset();
+ }
+
+ @Override
+ public int getNumberOfColumns() {
+ return numberOfColumns;
+ }
+
+ @Override
+ public boolean includeOrderedColumn(BitSet presentColumns, int columnIndex, boolean includeChildrenColumns) {
+ return true;
+ }
+
+ @Override
+ public int getPageZeroBufferCapacity() {
+ int pageSize = zerothSegmentWriter.getPageZeroBufferCapacity();
+ return pageSize * numberOfPageZeroSegments;
+ }
+
+ @Override
+ public int getRelativeColumnIndex(int columnIndex) {
+ return columnIndex;
+ }
+
+ @Override
+ public int getColumnOffsetsSize() {
+ return numberOfColumns * DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ }
+
+ @Override
+ public void setPageZero(ByteBuffer pageZero) {
+ throw new IllegalStateException("setPageZero is not supported in multi-page zero writer");
+ }
+
+ @Override
+ public int getHeaderSize() {
+ return EXTENDED_HEADER_SIZE;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
new file mode 100644
index 0000000..d555bdc
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers.multipage;
+
+import static org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter.MAX_COLUMNS_IN_ZEROTH_SEGMENT;
+import static org.apache.asterix.column.zero.writers.multipage.DefaultColumnMultiPageZeroWriter.NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET;
+import static org.apache.asterix.column.zero.writers.multipage.SparseColumnMultiPageZeroWriter.MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NEXT_LEAF_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.RIGHT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.asterix.column.zero.readers.SparseColumnPageZeroReader;
+import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
+public class SparseColumnMultiPageZeroReader extends AbstractColumnMultiPageZeroReader {
+ private final SparseColumnPageZeroReader zerothSegmentReader;
+ private final int maxNumberOfColumnsInAPage;
+ private final BitSet pageZeroSegmentsPages;
+ private final Int2IntOpenHashMap columnIndexToRelativeColumnIndex;
+
+ private int maxColumnIndexInZerothSegment;
+ private int numberOfColumnInZerothSegment;
+ private int numberOfPageZeroSegments;
+ private int headerSize;
+ private ByteBuffer pageZeroBuf;
+
+ private final VoidPointable offsetPointable;
+
+ public SparseColumnMultiPageZeroReader(int bufferCapacity) {
+ super();
+ zerothSegmentReader = new SparseColumnPageZeroReader();
+ this.pageZeroSegmentsPages = new BitSet();
+ this.maxNumberOfColumnsInAPage = bufferCapacity
+ / (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
+ this.offsetPointable = new VoidPointable();
+ this.columnIndexToRelativeColumnIndex = new Int2IntOpenHashMap();
+ columnIndexToRelativeColumnIndex.defaultReturnValue(-1);
+ }
+
+ @Override
+ public void resetStream(IColumnBufferProvider pageZeroSegmentBufferProvider) throws HyracksDataException {
+ segmentBuffers.reset(pageZeroSegmentBufferProvider);
+ }
+
+ @Override
+ public void reset(ByteBuffer pageZeroBuf) {
+ this.pageZeroBuf = pageZeroBuf;
+ numberOfPageZeroSegments = pageZeroBuf.getInt(NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET);
+ numberOfColumnInZerothSegment = pageZeroBuf.getInt(MAX_COLUMNS_IN_ZEROTH_SEGMENT);
+ maxColumnIndexInZerothSegment = pageZeroBuf.getInt(MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET);
+ headerSize = MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
+ zerothSegmentReader.reset(pageZeroBuf, Math.min(numberOfColumnInZerothSegment, getNumberOfPresentColumns()),
+ headerSize);
+ columnIndexToRelativeColumnIndex.clear();
+ }
+
+ @Override
+ public void reset(ByteBuffer pageZeroBuf, int headerSize) {
+ throw new UnsupportedOperationException("This method is not supported for multi-page zero readers.");
+ }
+
+ @Override
+ public int getColumnOffset(int columnIndex) throws HyracksDataException {
+ try {
+ if (columnIndex <= maxColumnIndexInZerothSegment) {
+ return zerothSegmentReader.getColumnOffset(columnIndex);
+ } else {
+ int segmentIndex = findSegment(columnIndex) - 1;
+ int relativeColumnIndex = findRelativeColumnIndex(columnIndex);
+ int columnIndexInRequiredSegment =
+ (relativeColumnIndex - numberOfColumnInZerothSegment) % maxNumberOfColumnsInAPage;
+ int segmentOffset =
+ columnIndexInRequiredSegment * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + Integer.BYTES; // skipping 4 bytes of columnIndex
+ segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset,
+ SparseColumnPageZeroWriter.COLUMN_INDEX_SIZE);
+ return IntegerPointable.getInteger(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+ }
+ } catch (EOFException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private int findSegment(int columnIndex) {
+ // This method finds the segment index (except for 0th segment) for the given columnIndex.
+ if (numberOfPageZeroSegments == 1) {
+ // only zeroth segment is present
+ return -1;
+ }
+ // gives 0 based segment index (0 for zeroth segment, 1 for first segment, etc.)
+ if (columnIndex <= maxColumnIndexInZerothSegment) {
+ return 0;
+ } else {
+ int start = 0;
+ int end = numberOfPageZeroSegments - 1;
+ int resultSegment = -1;
+ while (start <= end) {
+ int mid = (start + end) / 2;
+ int segmentColumnIndex =
+ pageZeroBuf.getInt(MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + mid * Integer.BYTES);
+ if (segmentColumnIndex >= columnIndex) {
+ resultSegment = mid;
+ end = mid - 1; // continue searching in the left half
+ } else {
+ start = mid + 1;
+ }
+ }
+ return resultSegment;
+ }
+ }
+
+ private int findRelativeColumnIndex(int columnIndex) throws HyracksDataException {
+ if (columnIndexToRelativeColumnIndex.get(columnIndex) != -1) {
+ return columnIndexToRelativeColumnIndex.get(columnIndex);
+ }
+ if (columnIndex <= maxColumnIndexInZerothSegment) {
+ return zerothSegmentReader.getRelativeColumnIndex(columnIndex);
+ } else {
+ int segmentIndex = findSegment(columnIndex);
+ if (segmentIndex == -1) {
+ return -1;
+ }
+ segmentIndex -= 1; // Adjusting to get the segment index for the segment stream
+ // Oth based segment index, hence need to check in segmentIndex - 1 th buffer
+ int numberOfColumnsInSegment =
+ segmentIndex == numberOfPageZeroSegments - 2
+ ? getNumberOfPresentColumns() - numberOfColumnInZerothSegment
+ - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage
+ : maxNumberOfColumnsInAPage;
+ int segmentColumnIndex =
+ segmentBuffers.findColumnIndexInSegment(segmentIndex, columnIndex, numberOfColumnsInSegment);
+ if (segmentColumnIndex == -1) {
+ return -1;
+ }
+ int relativeIndex =
+ numberOfColumnInZerothSegment + segmentIndex * maxNumberOfColumnsInAPage + segmentColumnIndex;
+ columnIndexToRelativeColumnIndex.put(columnIndex, relativeIndex);
+ return relativeIndex;
+ }
+ }
+
+ private int findNumberOfColumnsInSegment(int segmentIndex) {
+ // starts from 1st segment, not from 0th segment
+ if (segmentIndex == numberOfPageZeroSegments - 2) {
+ return getNumberOfPresentColumns() - numberOfColumnInZerothSegment
+ - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage;
+ }
+ // For segments beyond the zeroth segment, we can have maximum number of columns in a page, except the last segment.
+ return maxNumberOfColumnsInAPage;
+ }
+
+ @Override
+ public long getColumnFilterMin(int columnIndex) throws HyracksDataException {
+ try {
+ if (columnIndex <= maxColumnIndexInZerothSegment) {
+ return zerothSegmentReader.getColumnFilterMin(columnIndex);
+ } else {
+ int segmentIndex = findSegment(columnIndex) - 1;
+ int relativeColumnIndex = findRelativeColumnIndex(columnIndex);
+ int columnIndexInRequiredSegment =
+ (relativeColumnIndex - numberOfColumnInZerothSegment) % maxNumberOfColumnsInAPage;
+ int segmentOffset =
+ findNumberOfColumnsInSegment(segmentIndex) * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ segmentOffset += columnIndexInRequiredSegment * SparseColumnPageZeroWriter.FILTER_SIZE;
+ segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset, Long.BYTES);
+ return LongPointable.getLong(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+ }
+ } catch (EOFException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public long getColumnFilterMax(int columnIndex) throws HyracksDataException {
+ try {
+ if (columnIndex <= maxColumnIndexInZerothSegment) {
+ return zerothSegmentReader.getColumnFilterMax(columnIndex);
+ } else {
+ int segmentIndex = findSegment(columnIndex) - 1;
+ int relativeColumnIndex = findRelativeColumnIndex(columnIndex);
+ int columnIndexInRequiredSegment =
+ (relativeColumnIndex - numberOfColumnInZerothSegment) % maxNumberOfColumnsInAPage;
+ int segmentOffset =
+ findNumberOfColumnsInSegment(segmentIndex) * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ segmentOffset += columnIndexInRequiredSegment * SparseColumnPageZeroWriter.FILTER_SIZE;
+ segmentOffset += Long.BYTES; // skip min filter
+ segmentBuffers.read(segmentIndex, offsetPointable, segmentOffset, Long.BYTES);
+ return LongPointable.getLong(offsetPointable.getByteArray(), offsetPointable.getStartOffset());
+ }
+ } catch (EOFException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void skipFilters() {
+ zerothSegmentReader.skipFilters();
+ }
+
+ @Override
+ public void skipColumnOffsets() {
+ zerothSegmentReader.skipColumnOffsets();
+ }
+
+ @Override
+ public int getTupleCount() {
+ return pageZeroBuf.getInt(TUPLE_COUNT_OFFSET);
+ }
+
+ @Override
+ public int getLeftMostKeyOffset() {
+ return pageZeroBuf.getInt(LEFT_MOST_KEY_OFFSET);
+ }
+
+ @Override
+ public int getRightMostKeyOffset() {
+ return pageZeroBuf.getInt(RIGHT_MOST_KEY_OFFSET);
+ }
+
+ @Override
+ public int getNumberOfPresentColumns() {
+ return pageZeroBuf.getInt(NUMBER_OF_COLUMNS_OFFSET);
+ }
+
+ @Override
+ public int getRelativeColumnIndex(int columnIndex) throws HyracksDataException {
+ return findRelativeColumnIndex(columnIndex);
+ }
+
+ @Override
+ public int getNextLeaf() {
+ return pageZeroBuf.getInt(NEXT_LEAF_OFFSET);
+ }
+
+ @Override
+ public int getMegaLeafNodeLengthInBytes() {
+ return pageZeroBuf.getInt(MEGA_LEAF_NODE_LENGTH);
+ }
+
+ @Override
+ public int getPageZeroCapacity() {
+ return pageZeroBuf.capacity();
+ }
+
+ @Override
+ public boolean isValidColumn(int columnIndex) throws HyracksDataException {
+ return findRelativeColumnIndex(columnIndex) != -1;
+ }
+
+ @Override
+ public void getAllColumns(BitSet presentColumns) {
+ int columnOffsetStart = headerSize;
+ for (int i = 0; i < Math.min(getNumberOfPresentColumns(), numberOfColumnInZerothSegment); i++) {
+ int columnIndex = pageZeroBuf.getInt(columnOffsetStart);
+ presentColumns.set(columnIndex);
+ columnOffsetStart += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ }
+ if (getNumberOfPresentColumns() > numberOfColumnInZerothSegment) {
+ // read the rest of the columns from the segment stream
+ int columnsInLastSegment = getNumberOfPresentColumns() - numberOfColumnInZerothSegment
+ - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage;
+ segmentBuffers.readAllColumns(presentColumns, numberOfPageZeroSegments, maxNumberOfColumnsInAPage,
+ columnsInLastSegment);
+ }
+ }
+
+ @Override
+ public ByteBuffer getPageZeroBuf() {
+ throw new UnsupportedOperationException("This method is not supported for multi-page zero readers.");
+ }
+
+ @Override
+ public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+ // OffsetColumnIndexPairs is of size getNumberOfPresentColumns() + 1
+ int columnOffsetStart = headerSize;
+ for (int i = 0; i < Math.min(offsetColumnIndexPairs.length - 1, numberOfColumnInZerothSegment); i++) {
+ int columnIndex = pageZeroBuf.getInt(columnOffsetStart);
+ int columnOffset = pageZeroBuf.getInt(columnOffsetStart + SparseColumnPageZeroWriter.COLUMN_INDEX_SIZE);
+ offsetColumnIndexPairs[i] = IntPairUtil.of(columnOffset, columnIndex);
+ columnOffsetStart += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ }
+
+ if (offsetColumnIndexPairs.length - 1 > numberOfColumnInZerothSegment) {
+ // read the rest of the columns from the segment stream
+ int columnsInLastSegment = getNumberOfPresentColumns() - numberOfColumnInZerothSegment
+ - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage;
+ segmentBuffers.readSparseOffset(offsetColumnIndexPairs, numberOfPageZeroSegments, maxNumberOfColumnsInAPage,
+ columnsInLastSegment);
+ }
+ }
+
+ @Override
+ public int getNumberOfPageZeroSegments() {
+ return numberOfPageZeroSegments;
+ }
+
+ @Override
+ public BitSet getPageZeroSegmentsPages() {
+ return pageZeroSegmentsPages;
+ }
+
+ @Override
+ public int getHeaderSize() {
+ return headerSize;
+ }
+
+ @Override
+ public BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll) {
+ pageZeroSegmentsPages.clear();
+ // Not marking the zeroth segment
+ if (numberOfPageZeroSegments == 1 || markAll) {
+ // mark all segments as required
+ pageZeroSegmentsPages.set(1, numberOfPageZeroSegments);
+ } else {
+ // Iterate over the projected columns and mark the segments that contain them
+ int currentIndex = projectedColumns.nextSetBit(maxColumnIndexInZerothSegment + 1);
+ while (currentIndex >= 0) {
+ int rangeEnd = projectedColumns.nextClearBit(currentIndex); // exclusive
+ int startSegmentIndex = findSegment(currentIndex);
+ int endSegmentIndex = findSegment(rangeEnd - 1);
+
+ if (startSegmentIndex <= endSegmentIndex) {
+ pageZeroSegmentsPages.set(startSegmentIndex, endSegmentIndex + 1);
+ }
+
+ currentIndex = projectedColumns.nextSetBit(rangeEnd);
+ }
+ }
+ return pageZeroSegmentsPages;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java
new file mode 100644
index 0000000..695ee6e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.zero.writers.multipage;
+
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.FLAG_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.LEFT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.RIGHT_MOST_KEY_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.SIZE_OF_COLUMNS_OFFSETS_OFFSET;
+import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.asterix.column.bytes.stream.out.MultiPersistentPageZeroBufferBytesOutputStream;
+import org.apache.asterix.column.zero.writers.SparseColumnPageZeroWriter;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IValuesWriter;
+
+/*
+[ PageZero Segment 0 ]
+──────────────────────────────────────────────────────────────────────────────
+| Headers |
+| ───────────────────────────────────────────────────────────────────────── |
+| TupleCountOffset |
+| MaxColumnsInZerothSegment |
+| LevelOffset |
+| NumberOfColumnsOffset |
+| LeftMostKeyOffset |
+| RightMostKeyOffset |
+| SizeOfColumnsOffsetsOffset |
+| MegaLeafNodeLength |
+| FlagOffset |
+| NextLeafOffset |
+| NumberOfPageSegments |
+| MaxColumnIndexInZerothSegment |
+| MaxColumnIndexInFirstSegment |
+| MaxColumnIndexInThirdSegment |
+
+| Min Primary Key |
+| Max Primary Key |
+| Primary Key Values |
+| [ offset₁, min₁, max₁ ] |
+| [ offset₂, min₂, max₂ ] |
+| [ offset₃, min₃, max₃ ] |
+| ... |
+
+[ PageZero Segment 1..N ]
+──────────────────────────────────────────────────────────────────────────────
+| Additional column metadata (same format) |
+*/
+public class SparseColumnMultiPageZeroWriter implements IColumnPageZeroWriter {
+ //For storing the last columnIndex in the ith segment
+ public static final int NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET = HEADER_SIZE;
+ public static final int MAX_COLUMNS_IN_ZEROTH_SEGMENT_OFFSET = NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET + 4;
+ public static final int MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET = MAX_COLUMNS_IN_ZEROTH_SEGMENT_OFFSET + 4;
+
+ private final MultiPersistentPageZeroBufferBytesOutputStream segments;
+ private final SparseColumnPageZeroWriter zerothSegmentWriter;
+ private final int maximumNumberOfColumnsInAPage;
+ private final int zerothSegmentMaxColumns;
+ private int[] presentColumns;
+ private int numberOfPresentColumns;
+ private int numberOfPageZeroSegments;
+ private int numberOfColumnInZerothSegment;
+
+ public SparseColumnMultiPageZeroWriter(IColumnWriteMultiPageOp multiPageOp, int zerothSegmentMaxColumns,
+ int bufferCachePageSize) {
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new MutableObject<>();
+ multiPageOpRef.setValue(multiPageOp);
+ segments = new MultiPersistentPageZeroBufferBytesOutputStream(multiPageOpRef);
+ this.zerothSegmentMaxColumns = zerothSegmentMaxColumns;
+ this.zerothSegmentWriter = new SparseColumnPageZeroWriter();
+ this.maximumNumberOfColumnsInAPage = bufferCachePageSize
+ / (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
+ }
+
+ @Override
+ public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns) throws HyracksDataException {
+ this.presentColumns = presentColumns;
+ this.numberOfPresentColumns = presentColumns.length;
+ this.numberOfColumnInZerothSegment = Math.min(numberOfPresentColumns, zerothSegmentMaxColumns);
+ this.numberOfPageZeroSegments = calculateNumberOfPageZeroSegments(numberOfPresentColumns,
+ numberOfColumnInZerothSegment, maximumNumberOfColumnsInAPage);
+ int headerSize = MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
+ zerothSegmentWriter.resetInnerBasedOnColumns(presentColumns, numberOfColumnInZerothSegment, headerSize);
+ if (numberOfPageZeroSegments > 1) {
+ segments.reset(numberOfPageZeroSegments - 1);
+ }
+ }
+
+ @Override
+ public void resetBasedOnColumns(int[] presentColumns, int numberOfColumns, int headerSize)
+ throws HyracksDataException {
+ throw new UnsupportedOperationException(
+ "resetBasedOnColumns with headerSize is not supported in multi-page zero writer");
+ }
+
+ @Override
+ public byte flagCode() {
+ return MULTI_PAGE_SPARSE_WRITER_FLAG;
+ }
+
+ private int calculateNumberOfPageZeroSegments(int numberOfColumns, int numberOfColumnInZerothSegment,
+ int maximumNumberOfColumnsInAPage) {
+ // calculate the number of segments required to store the columns
+ int numberOfColumnsBeyondZerothSegment = numberOfColumns - numberOfColumnInZerothSegment;
+ if (numberOfColumnsBeyondZerothSegment <= 0) {
+ return 1; // only zeroth segment is needed
+ }
+ return 1 + (int) Math.ceil((double) numberOfColumnsBeyondZerothSegment / maximumNumberOfColumnsInAPage);
+ }
+
+ @Override
+ public void allocateColumns() {
+ // allocate the zeroth segment columns
+ zerothSegmentWriter.allocateColumns();
+ }
+
+ @Override
+ public void putColumnOffset(int absoluteColumnIndex, int relativeColumnIndex, int offset)
+ throws HyracksDataException {
+ // for sparse writer, we need to find the relative column index in the present columns.
+ try {
+ if (relativeColumnIndex < zerothSegmentMaxColumns) {
+ // Write to the zeroth segment
+ zerothSegmentWriter.putColumnOffset(absoluteColumnIndex, relativeColumnIndex, offset);
+ } else {
+ int columnIndexInSegment = relativeColumnIndex - numberOfColumnInZerothSegment;
+ int requiredSegment = columnIndexInSegment / maximumNumberOfColumnsInAPage;
+ int columnIndexInRequiredSegment = columnIndexInSegment % maximumNumberOfColumnsInAPage;
+ int offsetInSegment = columnIndexInRequiredSegment * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ segments.writeInSegment(requiredSegment, offsetInSegment, absoluteColumnIndex, offset);
+ }
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void putColumnFilter(int relativeColumnIndex, long normalizedMinValue, long normalizedMaxValue)
+ throws HyracksDataException {
+ try {
+ if (relativeColumnIndex < zerothSegmentMaxColumns) {
+ zerothSegmentWriter.putColumnFilter(relativeColumnIndex, normalizedMinValue, normalizedMaxValue);
+ } else {
+ // For columns beyond the zeroth segment, we need to write to the segments
+ int columnIndexInSegment = relativeColumnIndex - numberOfColumnInZerothSegment;
+ int requiredSegment = columnIndexInSegment / maximumNumberOfColumnsInAPage;
+ int columnIndexInRequiredSegment = columnIndexInSegment % maximumNumberOfColumnsInAPage;
+ int numberOfColumnsInSegment = findNumberOfColumnsInSegment(requiredSegment);
+ int segmentFilterOffset = numberOfColumnsInSegment * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ int offsetInSegment =
+ segmentFilterOffset + columnIndexInRequiredSegment * SparseColumnPageZeroWriter.FILTER_SIZE;
+ segments.writeInSegment(requiredSegment, offsetInSegment, normalizedMinValue);
+ segments.writeInSegment(requiredSegment, offsetInSegment + Long.BYTES, normalizedMaxValue);
+ }
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private int findNumberOfColumnsInSegment(int segmentIndex) {
+ // starts from 1st segment, not from 0th segment
+ if (segmentIndex == numberOfPageZeroSegments - 2) {
+ return numberOfPresentColumns - numberOfColumnInZerothSegment
+ - (numberOfPageZeroSegments - 2) * maximumNumberOfColumnsInAPage;
+ }
+ // For segments beyond the zeroth segment, we can have maximum number of columns in a page, except the last segment.
+ return maximumNumberOfColumnsInAPage;
+ }
+
+ @Override
+ public void writePrimaryKeyColumns(IValuesWriter[] primaryKeyWriters) throws HyracksDataException {
+ zerothSegmentWriter.writePrimaryKeyColumns(primaryKeyWriters);
+ }
+
+ @Override
+ public int getNumberOfColumns() {
+ return numberOfPresentColumns;
+ }
+
+ @Override
+ public boolean includeOrderedColumn(BitSet presentColumns, int columnIndex, boolean includeChildrenColumns) {
+ return zerothSegmentWriter.includeOrderedColumn(presentColumns, columnIndex, includeChildrenColumns);
+ }
+
+ @Override
+ public int getPageZeroBufferCapacity() {
+ int pageSize = zerothSegmentWriter.getPageZeroBufferCapacity();
+ return pageSize * numberOfPageZeroSegments;
+ }
+
+ @Override
+ public int getRelativeColumnIndex(int columnIndex) {
+ int relativeColumnIndex =
+ zerothSegmentWriter.findColumnIndex(presentColumns, numberOfPresentColumns, columnIndex);
+ if (relativeColumnIndex == -1) {
+ throw new IllegalStateException("Column index " + relativeColumnIndex + " is out of bounds");
+ }
+ return relativeColumnIndex;
+ }
+
+ @Override
+ public int getColumnOffsetsSize() {
+ return numberOfPresentColumns * SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+ }
+
+ @Override
+ public void setPageZero(ByteBuffer pageZero) {
+ throw new IllegalStateException("setPageZero is not supported in multi-page zero writer");
+ }
+
+ @Override
+ public void flush(ByteBuffer buf, int numberOfTuples, ITupleReference minKey, ITupleReference maxKey,
+ AbstractColumnTupleWriter columnWriter, ITreeIndexTupleWriter rowTupleWriter) throws HyracksDataException {
+ zerothSegmentWriter.setPageZero(buf);
+ buf.putInt(MEGA_LEAF_NODE_LENGTH, columnWriter.flush(this));
+ // Write min and max keys
+ int offset = buf.position();
+ buf.putInt(LEFT_MOST_KEY_OFFSET, offset);
+ offset += rowTupleWriter.writeTuple(minKey, buf.array(), offset);
+ buf.putInt(RIGHT_MOST_KEY_OFFSET, offset);
+ rowTupleWriter.writeTuple(maxKey, buf.array(), offset);
+
+ // Write page information
+ buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
+ buf.put(FLAG_OFFSET, flagCode());
+ buf.putInt(NUMBER_OF_COLUMNS_OFFSET, getNumberOfColumns());
+ buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, getColumnOffsetsSize());
+ // write the number of segments
+ buf.putInt(NUMBER_OF_PAGE_ZERO_SEGMENTS_OFFSET, numberOfPageZeroSegments);
+ // write the max column count in the zeroth segment
+ buf.putInt(MAX_COLUMNS_IN_ZEROTH_SEGMENT_OFFSET, numberOfColumnInZerothSegment);
+
+ // write the max columnIndex in headers.
+ for (int i = 0; i < numberOfPageZeroSegments; i++) {
+ int columnIndexOffset = MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + i * Integer.BYTES;
+ if (i == 0) {
+ int presentColumnIndex = numberOfColumnInZerothSegment - 1;
+ buf.putInt(columnIndexOffset, presentColumns[presentColumnIndex]);
+ } else if (i == numberOfPageZeroSegments - 1) {
+ buf.putInt(columnIndexOffset, presentColumns[numberOfPresentColumns - 1]);
+ } else {
+ int presentColumnIndex = numberOfColumnInZerothSegment + i * maximumNumberOfColumnsInAPage;
+ buf.putInt(columnIndexOffset, presentColumns[presentColumnIndex - 1]);
+ }
+ }
+
+ // reset the collected meta info
+ segments.finish();
+ columnWriter.reset();
+ }
+
+ @Override
+ public int getHeaderSize() {
+ return MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
+ }
+
+ public static int getHeaderSpace(int numberOfPageZeroSegments) {
+ return MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
+ }
+}
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
index 5c929c1..aa3cb71 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
@@ -35,6 +35,11 @@
}
@Override
+ public ByteBuffer confiscatePageZeroPersistent() throws HyracksDataException {
+ return null;
+ }
+
+ @Override
public ByteBuffer confiscateTemporary() throws HyracksDataException {
return null;
}
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java
index 8e01740..556a87c 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
public class TestWriteMultiPageOp implements IColumnWriteMultiPageOp {
@@ -37,6 +38,11 @@
}
@Override
+ public ByteBuffer confiscatePageZeroPersistent() throws HyracksDataException {
+ return dummyBufferCache.allocate(fileId).getBuffer();
+ }
+
+ @Override
public ByteBuffer confiscateTemporary() {
return dummyBufferCache.allocateTemporary();
}
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index fe5ef54..faa2a87 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -19,8 +19,6 @@
package org.apache.asterix.column.test.bytes;
import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
-import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.MEGA_LEAF_NODE_LENGTH;
-import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.NUMBER_OF_COLUMNS_OFFSET;
import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.TUPLE_COUNT_OFFSET;
import java.io.File;
@@ -190,14 +188,8 @@
protected void writeFullPage(ByteBuffer pageZero, AbstractColumnTupleWriter writer, int tupleCount)
throws HyracksDataException {
pageZero.clear();
- //Reserve the header space
- pageZero.position(HEADER_SIZE);
- pageZero.putInt(MEGA_LEAF_NODE_LENGTH, writer.flush(pageZero, new DefaultColumnPageZeroWriter()));
- //Write page header
- int numberOfColumn = writer.getAbsoluteNumberOfColumns(false);
- pageZero.putInt(TUPLE_COUNT_OFFSET, tupleCount);
- pageZero.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumn);
-
+ DefaultColumnPageZeroWriter pageZeroWriter = new DefaultColumnPageZeroWriter();
+ pageZeroWriter.flush(pageZero, tupleCount, writer);
}
protected boolean isFull(AbstractColumnTupleWriter columnWriter, int tupleCount, ITupleReference tuple) {
@@ -210,7 +202,7 @@
//Reserved for the number of pages
int requiredFreeSpace = HEADER_SIZE;
//Columns' Offsets
- requiredFreeSpace += columnWriter.getColumnOccupiedSpace(true);
+ requiredFreeSpace += columnWriter.getPageZeroWriterOccupiedSpace(100, true, false);
//Occupied space from previous writes
requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize();
//New tuple required space
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
index ea4dbe6a..0038fbe 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.column.values.writer;
-import java.nio.ByteBuffer;
import java.util.PriorityQueue;
import org.apache.asterix.column.values.IColumnBatchWriter;
@@ -33,8 +32,8 @@
}
@Override
- public void setPageZeroWriter(ByteBuffer pageZero, IColumnPageZeroWriter pageZeroWriter,
- int[] presentColumnsIndexes, int numberOfColumns) {
+ public void setPageZeroWriter(IColumnPageZeroWriter pageZeroWriter, int[] presentColumnsIndexes,
+ int numberOfColumns) {
// NoOp
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
index 973d9a1..86cac57 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
@@ -88,6 +88,10 @@
return cloudIOManager.punchHole(handle.getFileHandle(), offset, length);
}
+ public BufferCache getBufferCache() {
+ return bufferCache;
+ }
+
/**
* Whether the sweep operation should stop or proceed
* Stopping condition:
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index b52f59f..705df6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -64,7 +64,8 @@
*
* @return the size needed to store columns' offsets
*/
- public abstract int getColumnOccupiedSpace(boolean includeCurrentTupleColumns);
+ public abstract int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment,
+ boolean includeCurrentTupleColumns, boolean adaptive);
/**
* @return maximum number of tuples to be stored per page (i.e., page0)
@@ -88,8 +89,7 @@
*
* @return total flushed length (including page zero)
*/
- public abstract int flush(ByteBuffer pageZero, IColumnPageZeroWriter columnPageZeroWriter)
- throws HyracksDataException;
+ public abstract int flush(IColumnPageZeroWriter columnPageZeroWriter) throws HyracksDataException;
/**
* Close the current writer and release all allocated temporary buffers
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java
index 2309fe1..b46b67d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java
@@ -38,6 +38,8 @@
*/
ByteBuffer confiscatePersistent() throws HyracksDataException;
+ ByteBuffer confiscatePageZeroPersistent() throws HyracksDataException;
+
/**
* Persist all confiscated persistent buffers to disk
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index 98fa419..de2d1e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -25,10 +25,13 @@
import java.util.BitSet;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import it.unimi.dsi.fastutil.ints.IntArrays;
import it.unimi.dsi.fastutil.longs.LongArrays;
@@ -38,6 +41,7 @@
* Computes columns offsets, lengths, and pages
*/
public final class ColumnRanges {
+ private static final Logger LOGGER = LogManager.getLogger();
private static final LongComparator OFFSET_COMPARATOR = IntPairUtil.FIRST_COMPARATOR;
private final int numberOfPrimaryKeys;
@@ -79,7 +83,7 @@
*
* @param leafFrame to compute the ranges for
*/
- public void reset(ColumnBTreeReadLeafFrame leafFrame) {
+ public void reset(ColumnBTreeReadLeafFrame leafFrame) throws HyracksDataException {
reset(leafFrame, EMPTY, EMPTY, EMPTY);
}
@@ -89,7 +93,7 @@
* @param leafFrame to compute the ranges for
* @param plan eviction plan
*/
- public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet plan) {
+ public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet plan) throws HyracksDataException {
reset(leafFrame, plan, EMPTY, EMPTY);
}
@@ -102,7 +106,7 @@
* @param cloudOnlyColumns locked columns that cannot be read from a local disk
*/
public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet requestedColumns, BitSet evictableColumns,
- BitSet cloudOnlyColumns) {
+ BitSet cloudOnlyColumns) throws HyracksDataException {
// Set leafFrame
this.leafFrame = leafFrame;
// Ensure arrays capacities (given the leafFrame's columns and pages)
@@ -122,6 +126,12 @@
int columnOrdinal = 0;
for (int i = 0; i < numberOfColumns; i++) {
+ if (offsetColumnIndexPairs[i] == 0) { // any column's offset can't be zero
+ LOGGER.warn(
+ "Unexpected zero column offset at index {}. This may indicate a logic error or data inconsistency.",
+ i);
+ continue;
+ }
int columnIndex = getColumnIndexFromPair(offsetColumnIndexPairs[i]);
int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
@@ -165,7 +175,7 @@
* @param columnIndex column index
* @return pageID
*/
- public int getColumnStartPageIndex(int columnIndex) {
+ public int getColumnStartPageIndex(int columnIndex) throws HyracksDataException {
int pageSize = leafFrame.getBuffer().capacity();
return getColumnPageIndex(leafFrame.getColumnOffset(columnIndex), pageSize);
}
@@ -176,7 +186,7 @@
* @param columnIndex column index
* @return number of pages
*/
- public int getColumnNumberOfPages(int columnIndex) {
+ public int getColumnNumberOfPages(int columnIndex) throws HyracksDataException {
int pageSize = leafFrame.getBuffer().capacity();
int offset = getColumnStartOffset(leafFrame.getColumnOffset(columnIndex), pageSize);
int firstBufferLength = pageSize - offset;
@@ -297,8 +307,18 @@
for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
builder.append(String.format("%03d", i));
builder.append(":");
- int startPageId = getColumnStartPageIndex(i);
- int columnPagesCount = getColumnNumberOfPages(i);
+ int startPageId = 0;
+ try {
+ startPageId = getColumnStartPageIndex(i);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ int columnPagesCount = 0;
+ try {
+ columnPagesCount = getColumnNumberOfPages(i);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
printColumnPages(builder, numberOfPages, startPageId, columnPagesCount);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnReadContext.java
index 86a650b..e37b9b1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/IColumnReadContext.java
@@ -43,6 +43,9 @@
ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
throws HyracksDataException;
+ void preparePageZeroSegments(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+ throws HyracksDataException;
+
/**
* Prepare the columns' pages
* Notes:
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 8d64eec..2cefab3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -131,6 +131,23 @@
}
@Override
+ public void preparePageZeroSegments(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+ throws HyracksDataException {
+ if (leafFrame.getNumberOfPageZeroSegments() <= 1) { // don't need to include the zeroth segment
+ return;
+ }
+
+ // pin the required page segments
+ mergedPageRanges.clear();
+ int pageZeroId = leafFrame.getPageId();
+ BitSet pageZeroSegmentRanges =
+ leafFrame.markRequiredPageZeroSegments(projectedColumns, pageZeroId, operation == MERGE);
+ // Merge the page zero segments ranges
+ mergePageZeroSegmentRanges(pageZeroSegmentRanges);
+ mergedPageRanges.pin(columnCtx, bufferCache, fileId, pageZeroId);
+ }
+
+ @Override
public void prepareColumns(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
throws HyracksDataException {
if (leafFrame.getTupleCount() == 0) {
@@ -139,9 +156,12 @@
columnRanges.reset(leafFrame, projectedColumns, plan, cloudOnlyColumns);
int pageZeroId = leafFrame.getPageId();
+ int numberOfPageZeroSegments = leafFrame.getNumberOfPageZeroSegments();
if (operation == MERGE) {
- pinAll(fileId, pageZeroId, leafFrame.getMegaLeafNodeNumberOfPages() - 1, bufferCache);
+ // will contain column pages along with page zero segments
+ pinAll(fileId, pageZeroId + numberOfPageZeroSegments - 1,
+ leafFrame.getMegaLeafNodeNumberOfPages() - numberOfPageZeroSegments, bufferCache);
} else {
pinProjected(fileId, pageZeroId, bufferCache);
}
@@ -198,6 +218,36 @@
mergedPageRanges.pin(columnCtx, bufferCache, fileId, pageZeroId);
}
+ private void mergePageZeroSegmentRanges(BitSet pageZeroSegmentRanges) {
+ // Since the 0th segment is already pinned, we can skip it
+ pageZeroSegmentRanges.clear(0);
+ if (pageZeroSegmentRanges.cardinality() == 0) {
+ // No page zero segments, nothing to merge
+ return;
+ }
+
+ int start = -1;
+ int prev = -1;
+
+ int current = pageZeroSegmentRanges.nextSetBit(0);
+ while (current >= 0) {
+ if (start == -1) {
+ // Start of a new range
+ start = current;
+ } else if (current != prev + 1) {
+ // Discontinuous: close the current range
+ mergedPageRanges.addRange(start, prev);
+ start = current;
+ }
+
+ prev = current;
+ current = pageZeroSegmentRanges.nextSetBit(current + 1);
+ }
+
+ // Close the final range
+ mergedPageRanges.addRange(start, prev);
+ }
+
@Override
public void release(IBufferCache bufferCache) throws HyracksDataException {
// Release might differ in the future if prefetching is supported
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java
index 11275a4..5917f65 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java
@@ -76,7 +76,14 @@
}
@Override
+ public void preparePageZeroSegments(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
+ throws HyracksDataException {
+ // NoOp
+ }
+
+ @Override
public void prepareColumns(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId) {
+ // If there are page segments, they are expected to be present in flash cache.
// NoOp
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
index d12f649..5b837a1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
@@ -20,6 +20,7 @@
import static org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -58,6 +59,7 @@
private final IntSet indexedColumns;
private final ISweepClock clock;
private final int evictionPlanReevaluationThreshold;
+ private final List<ICachedPage> pageZeroSegmentsTempHolder;
private int numberOfColumns;
private long lastAccess;
private int maxSize;
@@ -78,6 +80,7 @@
plan = new BitSet();
reevaluatedPlan = new BitSet();
punchableThreshold = INITIAL_PUNCHABLE_THRESHOLD;
+ pageZeroSegmentsTempHolder = new ArrayList<>();
this.evictionPlanReevaluationThreshold = evictionPlanReevaluationThreshold;
}
@@ -215,12 +218,19 @@
ICachedPage page = bufferCache.pin(dpid);
try {
leafFrame.setPage(page);
+ pageZeroSegmentsTempHolder.clear();
+ leafFrame.pinPageZeroSegments(columnBTree.getFileId(), columnBTree.getBulkloadLeafStart(),
+ pageZeroSegmentsTempHolder, bufferCache, null);
ranges.reset(leafFrame);
for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
sizes[i] = Math.max(sizes[i], ranges.getColumnLength(i));
maxSize = Math.max(maxSize, sizes[i]);
}
} finally {
+ // unpin the segment pages
+ for (ICachedPage segmentPage : pageZeroSegmentsTempHolder) {
+ bufferCache.unpin(segmentPage);
+ }
bufferCache.unpin(page);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
index 9fc3b8d..1f0c7f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.util.StorageUtil;
@@ -50,11 +51,13 @@
private final ColumnSweepLockInfo lockedColumns;
private final ColumnRanges ranges;
private final List<ILSMDiskComponent> sweepableComponents;
+ private final List<ICachedPage> segmentPagesTempHolder;
public ColumnSweeper(int numberOfPrimaryKeys) {
lockedColumns = new ColumnSweepLockInfo();
ranges = new ColumnRanges(numberOfPrimaryKeys);
sweepableComponents = new ArrayList<>();
+ segmentPagesTempHolder = new ArrayList<>();
}
public long sweep(BitSet plan, SweepContext context, IColumnTupleProjector sweepProjector)
@@ -175,6 +178,8 @@
columnsLocked = page0.trySweepLock(lockedColumns);
if (columnsLocked) {
leafFrame.setPage(page0);
+ leafFrame.pinPageZeroSegments(fileId, leafFrame.getPageId(), segmentPagesTempHolder,
+ context.getBufferCache(), SweepBufferCacheReadContext.INSTANCE);
ranges.reset(leafFrame, plan);
freedSpace += punchHoles(context, leafFrame);
}
@@ -182,6 +187,10 @@
if (columnsLocked) {
page0.sweepUnlock();
}
+ // unpin segment pages
+ for (ICachedPage cachedPage : segmentPagesTempHolder) {
+ context.unpin(cachedPage, bcOpCtx);
+ }
context.unpin(page0, bcOpCtx);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
index f3a7c4f..f03195b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/AbstractColumnBTreeLeafFrame.java
@@ -100,12 +100,11 @@
// Duplicate to avoid interference when scanning the dataset twice
this.buf = page.getBuffer().duplicate();
buf.clear();
- buf.position(HEADER_SIZE);
resetPageZeroReader();
}
protected void resetPageZeroReader() {
-
+ buf.position(HEADER_SIZE);
};
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 8d1080a..88fe5bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -49,11 +49,14 @@
public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements IColumnWriteMultiPageOp {
private static final Logger LOGGER = LogManager.getLogger();
private final List<CachedPage> columnsPages;
+ private final List<ICachedPage> pageZeroSegments; // contains from 1st segment to the last segment of page0
private final List<CachedPage> tempConfiscatedPages;
private final ColumnBTreeWriteLeafFrame columnarFrame;
private final AbstractColumnTupleWriter columnWriter;
private final ISplitKey lowKey;
private final IColumnWriteContext columnWriteContext;
+ private final int maxColumnsInPageZerothSegment;
+ private final boolean adaptiveWriter;
private boolean setLowKey;
private int tupleCount;
@@ -61,6 +64,7 @@
private int numberOfLeafNodes;
private int numberOfPagesInCurrentLeafNode;
private int maxNumberOfPagesForAColumn;
+ private int maxNumberOfPageZeroSegments; // Exclude the zeroth segment
private int maxNumberOfPagesInALeafNode;
private int maxTupleCount;
private int lastRequiredFreeSpace;
@@ -69,6 +73,7 @@
ITreeIndexFrame leafFrame, IBufferCacheWriteContext writeContext) throws HyracksDataException {
super(fillFactor, verifyInput, callback, index, leafFrame, writeContext);
columnsPages = new ArrayList<>();
+ pageZeroSegments = new ArrayList<>();
tempConfiscatedPages = new ArrayList<>();
columnWriteContext = (IColumnWriteContext) writeContext;
columnarFrame = (ColumnBTreeWriteLeafFrame) leafFrame;
@@ -78,10 +83,15 @@
lowKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
setLowKey = true;
+ // Writer config
+ maxColumnsInPageZerothSegment = 40; // setting a lower value for testing, should be coming from config.
+ adaptiveWriter = false; // should be coming from config.
+
// For logging. Starts with 1 for page0
numberOfPagesInCurrentLeafNode = 1;
maxNumberOfPagesForAColumn = 0;
maxNumberOfPagesInALeafNode = 0;
+ maxNumberOfPageZeroSegments = 0;
numberOfLeafNodes = 1;
maxTupleCount = 0;
lastRequiredFreeSpace = 0;
@@ -116,10 +126,10 @@
//We reached the maximum number of tuples
return true;
}
- columnWriter.updateColumnMetadataForCurrentTuple(tuple);
- int requiredFreeSpace = AbstractColumnBTreeLeafFrame.HEADER_SIZE;
//Columns' Offsets
- requiredFreeSpace += columnWriter.getColumnOccupiedSpace(true);
+ columnWriter.updateColumnMetadataForCurrentTuple(tuple);
+ int requiredFreeSpace =
+ columnWriter.getPageZeroWriterOccupiedSpace(maxColumnsInPageZerothSegment, true, adaptiveWriter);
//Occupied space from previous writes
requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize();
//min and max tuples' sizes
@@ -144,7 +154,8 @@
if (tupleCount > 0) {
splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
try {
- columnarFrame.flush(columnWriter, tupleCount, lowKey.getTuple(), splitKey.getTuple());
+ columnarFrame.flush(columnWriter, tupleCount, maxColumnsInPageZerothSegment, lowKey.getTuple(),
+ splitKey.getTuple(), this);
} catch (Exception e) {
logState(e);
throw e;
@@ -173,7 +184,8 @@
if (tupleCount > 0) {
//We need to flush columns to confiscate all columns pages first before calling propagateBulk
try {
- columnarFrame.flush(columnWriter, tupleCount, lowKey.getTuple(), splitKey.getTuple());
+ columnarFrame.flush(columnWriter, tupleCount, maxColumnsInPageZerothSegment, lowKey.getTuple(),
+ splitKey.getTuple(), this);
} catch (Exception e) {
logState(e);
throw e;
@@ -190,7 +202,7 @@
* Write columns' pages first to ensure they (columns' pages) are written before pageZero.
* It ensures pageZero does not land in between columns' pages if compression is enabled
*/
- writeColumnsPages();
+ writeColumnAndSegmentPages();
//Then write page0
write(leafFrontier.page);
@@ -219,10 +231,34 @@
* Write columns' pages first to ensure they (columns' pages) are written before pageZero.
* It ensures pageZero does not land in between columns' pages if compression is enabled
*/
- writeColumnsPages();
+ writeColumnAndSegmentPages();
super.writeLastLeaf(page);
}
+ private void writeColumnAndSegmentPages() throws HyracksDataException {
+ for (ICachedPage c : columnsPages) {
+ write(c);
+ }
+
+ // For logging
+ int numberOfPagesInPersistedColumn = columnsPages.size();
+ maxNumberOfPagesForAColumn = Math.max(maxNumberOfPagesForAColumn, numberOfPagesInPersistedColumn);
+ numberOfPagesInCurrentLeafNode += numberOfPagesInPersistedColumn;
+ columnsPages.clear();
+
+ // persist page zero segments from 1 to the last segment
+ for (ICachedPage page : pageZeroSegments) {
+ write(page);
+ }
+
+ int numberOfPageZeroSegments = pageZeroSegments.size();
+ maxNumberOfPageZeroSegments = Math.max(maxNumberOfPageZeroSegments, numberOfPageZeroSegments);
+ pageZeroSegments.clear();
+
+ // Indicate to the columnWriteContext that all columns were persisted
+ columnWriteContext.columnsPersisted();
+ }
+
private void writeColumnsPages() throws HyracksDataException {
for (ICachedPage c : columnsPages) {
write(c);
@@ -244,6 +280,10 @@
bufferCache.returnPage(page, false);
}
+ for (ICachedPage page : pageZeroSegments) {
+ bufferCache.returnPage(page, false);
+ }
+
for (ICachedPage page : tempConfiscatedPages) {
bufferCache.returnPage(page, false);
}
@@ -291,6 +331,15 @@
}
@Override
+ public ByteBuffer confiscatePageZeroPersistent() throws HyracksDataException {
+ int pageId = freePageManager.takePage(metaFrame);
+ long dpid = BufferedFileHandle.getDiskPageId(fileId, pageId);
+ CachedPage page = (CachedPage) bufferCache.confiscatePage(dpid);
+ pageZeroSegments.add(page);
+ return page.getBuffer();
+ }
+
+ @Override
public void persist() throws HyracksDataException {
writeColumnsPages();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index 2669d4b..6bcbd0f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -92,8 +92,9 @@
do {
page0 = context.pinNext(frame, bufferCache, fileId);
stats.getPageCounter().update(1);
- context.prepareColumns(frame, bufferCache, fileId);
+ context.preparePageZeroSegments(frame, bufferCache, fileId);
frameTuple.newPage();
+ context.prepareColumns(frame, bufferCache, fileId);
setCursorPosition();
nextLeafPage = frame.getNextLeaf();
} while (frame.getTupleCount() == 0 && nextLeafPage > 0);
@@ -131,8 +132,9 @@
frame.setPage(page0);
frame.setMultiComparator(originalKeyCmp);
if (frame.getTupleCount() > 0) {
- context.prepareColumns(frame, bufferCache, fileId);
+ context.preparePageZeroSegments(frame, bufferCache, fileId);
frameTuple.newPage();
+ context.prepareColumns(frame, bufferCache, fileId);
initCursorPosition(searchPred);
} else {
yieldFirstCall = false;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
index 58a62ba..4a78400 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
@@ -18,6 +18,10 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.column.impls.btree;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
@@ -25,10 +29,11 @@
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-
public final class ColumnBTreeReadLeafFrame extends AbstractColumnBTreeLeafFrame {
private final AbstractColumnTupleReader columnarTupleReader;
private final ITreeIndexTupleReference leftMostTuple;
@@ -45,8 +50,23 @@
@Override
protected void resetPageZeroReader() {
- columnPageZeroReader = pageZeroWriterFlavorSelector.createPageZeroReader(getFlagByte());
+ columnPageZeroReader = pageZeroWriterFlavorSelector.createPageZeroReader(getFlagByte(), buf.capacity());
columnPageZeroReader.reset(buf);
+ buf.position(columnPageZeroReader.getHeaderSize());
+ }
+
+ public void pinPageZeroSegments(int fileId, int pageZeroId, List<ICachedPage> segmentPages,
+ IBufferCache bufferCache, IBufferCacheReadContext bcOpCtx) throws HyracksDataException {
+ // pins all the segments, used by the column planner and sweeper
+ int numberOfPageSegments = getNumberOfPageZeroSegments();
+ for (int i = 1; i < numberOfPageSegments; i++) {
+ long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
+ if (bcOpCtx != null) {
+ segmentPages.add(bufferCache.pin(dpid, bcOpCtx));
+ } else {
+ segmentPages.add(bufferCache.pin(dpid));
+ }
+ }
}
@Override
@@ -71,7 +91,7 @@
return rightMostTuple;
}
- public void getAllColumns(IntOpenHashSet presentColumns) {
+ public void getAllColumns(BitSet presentColumns) {
columnPageZeroReader.getAllColumns(presentColumns);
}
@@ -88,11 +108,15 @@
return BufferedFileHandle.getPageId(((CachedPage) page).getDiskPageId());
}
+ public int getNumberOfPageZeroSegments() {
+ return columnPageZeroReader.getNumberOfPageZeroSegments();
+ }
+
public int getNumberOfColumns() {
return columnPageZeroReader.getNumberOfPresentColumns();
}
- public int getColumnOffset(int columnIndex) {
+ public int getColumnOffset(int columnIndex) throws HyracksDataException {
// update the exception message.
if (!columnPageZeroReader.isValidColumn(columnIndex)) {
throw new IndexOutOfBoundsException(columnIndex + " >= " + getNumberOfColumns());
@@ -100,7 +124,7 @@
return columnPageZeroReader.getColumnOffset(columnIndex);
}
- public boolean isValidColumn(int columnIndex) {
+ public boolean isValidColumn(int columnIndex) throws HyracksDataException {
return columnPageZeroReader.isValidColumn(columnIndex);
}
@@ -112,6 +136,14 @@
return columnPageZeroReader.getMegaLeafNodeLengthInBytes();
}
+ public int getHeaderSize() {
+ return columnPageZeroReader.getHeaderSize();
+ }
+
+ public int getPageZeroSegmentCount() {
+ return columnPageZeroReader.getNumberOfPageZeroSegments();
+ }
+
// flag needs to be directly accessed from the buffer, as this will be used to choose the pageReader
public byte getFlagByte() {
return buf.get(FLAG_OFFSET);
@@ -148,4 +180,12 @@
public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
columnPageZeroReader.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
}
+
+ public BitSet getPageZeroSegmentsPages() {
+ return columnPageZeroReader.getPageZeroSegmentsPages();
+ }
+
+ public BitSet markRequiredPageZeroSegments(BitSet projectedColumns, int pageZeroId, boolean markAll) {
+ return columnPageZeroReader.markRequiredPageSegments(projectedColumns, pageZeroId, markAll);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
index f5b7893..acee2d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -47,34 +48,12 @@
buf.putInt(NEXT_LEAF_OFFSET, -1);
}
- void flush(AbstractColumnTupleWriter columnWriter, int numberOfTuples, ITupleReference minKey,
- ITupleReference maxKey) throws HyracksDataException {
- IColumnPageZeroWriter pageZeroWriter = pageZeroWriterFlavorSelector.getPageZeroWriter();
-
- // TODO(zero): Ideally, all these fields should be specific to the writer.
- // However, some of the position constants are accessed directly elsewhere,
- // so refactoring will require careful consideration to avoid breaking existing usage.
-
- // Prepare the space for writing the columns' information such as the primary keys
- buf.position(HEADER_SIZE);
- // Flush the columns to persistence pages and write the length of the mega leaf node in pageZero
- buf.putInt(MEGA_LEAF_NODE_LENGTH, columnWriter.flush(buf, pageZeroWriter));
- // Write min and max keys
- int offset = buf.position();
- buf.putInt(LEFT_MOST_KEY_OFFSET, offset);
- offset += rowTupleWriter.writeTuple(minKey, buf.array(), offset);
- buf.putInt(RIGHT_MOST_KEY_OFFSET, offset);
- rowTupleWriter.writeTuple(maxKey, buf.array(), offset);
-
- // Write page information
- buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples);
- buf.put(FLAG_OFFSET, pageZeroWriter.flagCode());
- buf.putInt(NUMBER_OF_COLUMNS_OFFSET, pageZeroWriter.getNumberOfColumns());
- // correct the offset's, this all should be deferred to writer
- buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, pageZeroWriter.getColumnOffsetsSize());
-
- // reset the collected meta info
- columnWriter.reset();
+ void flush(AbstractColumnTupleWriter columnWriter, int numberOfTuples, int zerothSegmentMaxColumns,
+ ITupleReference minKey, ITupleReference maxKey, IColumnWriteMultiPageOp multiPageOpRef)
+ throws HyracksDataException {
+ IColumnPageZeroWriter pageZeroWriter = pageZeroWriterFlavorSelector.getPageZeroWriter(multiPageOpRef,
+ zerothSegmentMaxColumns, getBuffer().capacity());
+ pageZeroWriter.flush(buf, numberOfTuples, minKey, maxKey, columnWriter, rowTupleWriter);
}
public AbstractColumnTupleWriter getColumnTupleWriter() {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
index 5a625fc..db4f778 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
@@ -19,18 +19,24 @@
package org.apache.hyracks.storage.am.lsm.btree.column.impls.btree;
import java.nio.ByteBuffer;
+import java.util.BitSet;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
public interface IColumnPageZeroReader {
- void reset(ByteBuffer pageZeroBuf);
+ default void reset(ByteBuffer pageZeroBuf) {
+ reset(pageZeroBuf, AbstractColumnBTreeLeafFrame.HEADER_SIZE);
+ }
- int getColumnOffset(int columnIndex);
+ void reset(ByteBuffer pageZeroBuf, int headerSize);
- int getColumnFilterOffset(int columnIndex);
+ int getColumnOffset(int columnIndex) throws HyracksDataException;
- long getLong(int offset);
+ long getColumnFilterMin(int columnIndex) throws HyracksDataException;
+
+ long getColumnFilterMax(int columnIndex) throws HyracksDataException;
void skipFilters();
@@ -44,7 +50,7 @@
int getNumberOfPresentColumns();
- int getRelativeColumnIndex(int columnIndex);
+ int getRelativeColumnIndex(int columnIndex) throws HyracksDataException;
int getNextLeaf();
@@ -52,11 +58,21 @@
int getPageZeroCapacity();
- boolean isValidColumn(int columnIndex);
+ boolean isValidColumn(int columnIndex) throws HyracksDataException;
- void getAllColumns(IntOpenHashSet presentColumns);
+ void getAllColumns(BitSet presentColumns);
ByteBuffer getPageZeroBuf();
void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs);
+
+ int getNumberOfPageZeroSegments();
+
+ BitSet getPageZeroSegmentsPages();
+
+ int getHeaderSize();
+
+ void resetStream(IColumnBufferProvider pageZeroSegmentBufferProvider) throws HyracksDataException;
+
+ BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriter.java
index 041415a..3795c47 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriter.java
@@ -22,6 +22,9 @@
import java.util.BitSet;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
/**
* Interface for writing column metadata to page zero of a column page.
@@ -39,14 +42,23 @@
/** Flag code for sparse page zero writer */
byte SPARSE_WRITER_FLAG = 1;
+ byte MULTI_PAGE_DEFAULT_WRITER_FLAG = 2;
+
+ byte MULTI_PAGE_SPARSE_WRITER_FLAG = 3;
+
+ int MIN_COLUMN_SPACE = 4 + 16; // offset + filter size
+
/**
* Initializes the writer with page zero buffer and column information.
- *
- * @param pageZeroBuf The page zero buffer to write to
+ *
* @param presentColumns Array of column indexes that are present in this page
* @param numberOfColumns Total number of columns in the schema (may be larger than presentColumns)
*/
- void reset(ByteBuffer pageZeroBuf, int[] presentColumns, int numberOfColumns);
+ void resetBasedOnColumns(int[] presentColumns, int numberOfColumns, int headerSize) throws HyracksDataException;
+
+ default void resetBasedOnColumns(int[] presentColumns, int numberOfColumns) throws HyracksDataException {
+ resetBasedOnColumns(presentColumns, numberOfColumns, AbstractColumnBTreeLeafFrame.HEADER_SIZE);
+ }
/**
* Returns the flag code that identifies this writer type.
@@ -69,7 +81,7 @@
* @param relativeColumnIndex The relative column index within this page (for sparse layouts)
* @param offset The byte offset where the column's data begins
*/
- void putColumnOffset(int absoluteColumnIndex, int relativeColumnIndex, int offset);
+ void putColumnOffset(int absoluteColumnIndex, int relativeColumnIndex, int offset) throws HyracksDataException;
/**
* Stores filter information (min/max values) for a column.
@@ -79,7 +91,8 @@
* @param normalizedMinValue The normalized minimum value in the column
* @param normalizedMaxValue The normalized maximum value in the column
*/
- void putColumnFilter(int relativeColumnIndex, long normalizedMinValue, long normalizedMaxValue);
+ void putColumnFilter(int relativeColumnIndex, long normalizedMinValue, long normalizedMaxValue)
+ throws HyracksDataException;
/**
* Writes primary key column data to page zero.
@@ -115,7 +128,7 @@
*
* @return the page zero buffer
*/
- ByteBuffer getPageZeroBuffer();
+ int getPageZeroBufferCapacity();
/**
* Maps an absolute column index to a relative index within this page.
@@ -133,4 +146,11 @@
* @return size in bytes of column offset storage
*/
int getColumnOffsetsSize();
+
+ void setPageZero(ByteBuffer pageZero);
+
+ void flush(ByteBuffer buf, int numberOfTuples, ITupleReference minKey, ITupleReference maxKey,
+ AbstractColumnTupleWriter columnWriter, ITreeIndexTupleWriter rowTupleWriter) throws HyracksDataException;
+
+ int getHeaderSize();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriterFlavorSelector.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriterFlavorSelector.java
index fe2e24e..fa8e3f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriterFlavorSelector.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroWriterFlavorSelector.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.column.impls.btree;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
/**
* Strategy interface for selecting the optimal page zero writer implementation.
*
@@ -40,24 +42,27 @@
* @param spaceOccupiedByDefaultWriter Space in bytes required by the default writer
* @param spaceOccupiedBySparseWriter Space in bytes required by the sparse writer
*/
- void switchPageZeroWriterIfNeeded(int spaceOccupiedByDefaultWriter, int spaceOccupiedBySparseWriter);
+ void switchPageZeroWriterIfNeeded(int spaceOccupiedByDefaultWriter, int spaceOccupiedBySparseWriter,
+ boolean adaptive);
/**
* Creates the appropriate page zero reader for the given writer type.
- *
+ * <p>
* This method is used during deserialization to create a reader that matches
* the writer type used during serialization. The flag identifies which
* layout was used.
- *
- * @param flag The flag code identifying the writer type (0=default, 1=sparse)
+ *
+ * @param flag The flag code identifying the writer type (0=default, 1=sparse)
+ * @param capacity
* @return the appropriate reader instance
*/
- IColumnPageZeroReader createPageZeroReader(byte flag);
+ IColumnPageZeroReader createPageZeroReader(byte flag, int capacity);
/**
* Returns the currently selected page zero writer instance.
*
* @return the writer instance selected by the most recent call to switchPageZeroWriterIfNeeded
*/
- IColumnPageZeroWriter getPageZeroWriter();
+ IColumnPageZeroWriter getPageZeroWriter(IColumnWriteMultiPageOp multiPageOpRef, int zerothSegmentMaxColumns,
+ int maximumColumnPerPageSegment);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index a1c60a6..de59836 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -18,8 +18,6 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples;
-import static org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.AbstractColumnBTreeLeafFrame.HEADER_SIZE;
-
import java.nio.ByteBuffer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -29,6 +27,7 @@
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.IColumnPageZeroReader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -40,6 +39,7 @@
private static final String UNSUPPORTED_OPERATION_MSG = "Operation is not supported for column tuples";
private final int componentIndex;
protected final ColumnBTreeReadLeafFrame frame;
+ protected final IColumnBufferProvider pageZeroSegmentBufferProvider;
private final IColumnBufferProvider[] primaryKeyBufferProviders;
private final IColumnBufferProvider[] filterBufferProviders;
private final IColumnBufferProvider[] buffersProviders;
@@ -74,6 +74,7 @@
pinnedPages = new LongOpenHashSet();
int numberOfFilteredColumns = info.getNumberOfFilteredColumns();
filterBufferProviders = new IColumnBufferProvider[numberOfFilteredColumns];
+ pageZeroSegmentBufferProvider = new ColumnMultiPageZeroBufferProvider(multiPageOp, pinnedPages);
for (int i = 0; i < numberOfFilteredColumns; i++) {
int columnIndex = info.getFilteredColumnIndex(i);
if (columnIndex < 0) {
@@ -103,8 +104,9 @@
public void newPage() throws HyracksDataException {
tupleIndex = 0;
ByteBuffer pageZero = frame.getBuffer();
+ // should not be needed, as it just been reset in step above
pageZero.clear();
- pageZero.position(HEADER_SIZE);
+ pageZero.position(frame.getHeaderSize());
int numberOfTuples = frame.getTupleCount();
@@ -114,6 +116,13 @@
provider.reset(frame);
startPrimaryKey(provider, i, numberOfTuples);
}
+
+ // if the pageZero segments > 1, reset the page zero segment buffer provider
+ if (frame.getPageZeroSegmentCount() > 1) {
+ IColumnPageZeroReader pageZeroReader = frame.getColumnPageZeroReader();
+ pageZeroSegmentBufferProvider.reset(frame);
+ pageZeroReader.resetStream(pageZeroSegmentBufferProvider);
+ }
}
@Override
@@ -249,6 +258,9 @@
buffersProviders[i].releaseAll();
}
+ // release pageZero segment buffer provider
+ pageZeroSegmentBufferProvider.releaseAll();
+
maxNumberOfPinnedPages = Math.max(maxNumberOfPinnedPages, pinnedPages.size());
pinnedPages.clear();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
index 988d413..8737db3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
@@ -35,13 +35,13 @@
import it.unimi.dsi.fastutil.longs.LongSet;
-public final class ColumnMultiBufferProvider implements IColumnBufferProvider {
+public class ColumnMultiBufferProvider implements IColumnBufferProvider {
private final int columnIndex;
private final IColumnReadMultiPageOp multiPageOp;
private final Queue<ICachedPage> pages;
private final LongSet pinnedPages;
- private int numberOfRemainingPages;
- private int startPage;
+ protected int numberOfRemainingPages;
+ protected int startPage;
private int startOffset;
private int length;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
new file mode 100644
index 0000000..1556bea
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.longs.LongSet;
+
+public class ColumnMultiPageZeroBufferProvider implements IColumnBufferProvider {
+ private static final BitSet EMPTY_SEGMENTS = new BitSet();
+ private final IColumnReadMultiPageOp multiPageOp;
+ private final LongSet pinnedPages;
+ private final List<ICachedPage> pages; // stores from segment 1 to segment n (0 is not stored here)
+
+ private int startPage;
+ private int numberOfRemainingPages;
+ private BitSet pageZeroSegmentsPages;
+
+ public ColumnMultiPageZeroBufferProvider(IColumnReadMultiPageOp multiPageOp, LongSet pinnedPages) {
+ this.multiPageOp = multiPageOp;
+ this.pinnedPages = pinnedPages;
+ this.pages = new ArrayList<>();
+ }
+
+ @Override
+ public void reset(ColumnBTreeReadLeafFrame frame) throws HyracksDataException {
+ startPage = frame.getPageId() + 1;
+ numberOfRemainingPages = frame.getNumberOfPageZeroSegments() - 1; // zeroth segment is not counted
+ pageZeroSegmentsPages = frame.getPageZeroSegmentsPages();
+ if (pageZeroSegmentsPages == null) {
+ pageZeroSegmentsPages = EMPTY_SEGMENTS;
+ }
+ }
+
+ @Override
+ public void readAll(Queue<ByteBuffer> buffers) throws HyracksDataException {
+ throw new IllegalStateException("Reading all pages is not allowed for zero buffer provider.");
+ }
+
+ public int getNumberOfRemainingPages() {
+ return numberOfRemainingPages;
+ }
+
+ public void readAll(List<ByteBuffer> buffers, Int2IntMap segmentDir) throws HyracksDataException {
+ if (pageZeroSegmentsPages == EMPTY_SEGMENTS) {
+ // All the segments are expected to present in the flash cache, but still need pinning into buffer cache.
+ // will do on request basis? or prefetch all the segments?
+ return;
+ }
+ // traverse the set segments and read the pages
+ int currentIndex = pageZeroSegmentsPages.nextSetBit(0);
+ while (currentIndex != -1) {
+ int segmentIndex = currentIndex - 1; // segmentIndex starts from 1
+ ByteBuffer buffer = read(segmentIndex);
+ segmentDir.put(segmentIndex, buffers.size());
+ buffers.add(buffer);
+ currentIndex = pageZeroSegmentsPages.nextSetBit(currentIndex + 1);
+ }
+ }
+
+ public ByteBuffer read(int segmentIndex) throws HyracksDataException {
+ if (segmentIndex < 0 || segmentIndex >= numberOfRemainingPages) {
+ throw new IndexOutOfBoundsException("Segment index out of bounds: " + segmentIndex);
+ }
+ ICachedPage segment = readSegment(segmentIndex);
+ return segment.getBuffer();
+ }
+
+ @Override
+ public void releaseAll() throws HyracksDataException {
+ for (ICachedPage page : pages) {
+ multiPageOp.unpin(page);
+ }
+ pages.clear();
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ throw new UnsupportedOperationException("getBuffer() is not supported for multi-page zero buffer provider.");
+ }
+
+ @Override
+ public int getLength() {
+ throw new IllegalStateException("Reading all pages is not allowed for zero buffer provider.");
+ }
+
+ private ICachedPage readSegment(int segmentIndex) throws HyracksDataException {
+ // The page segments are most likely to be present in the buffer cache,
+ // as the pages are pinned when a new pageZero is accessed.
+ ICachedPage segmentPage = multiPageOp.pin(startPage + segmentIndex);
+ pages.add(segmentPage);
+ pinnedPages.add(((CachedPage) segmentPage).getDiskPageId());
+ return segmentPage;
+ }
+
+ @Override
+ public int getColumnIndex() {
+ throw new IllegalStateException("Reading all pages is not allowed for zero buffer provider.");
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnSingleBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnSingleBufferProvider.java
index 3ae5c7d..cb953fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnSingleBufferProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnSingleBufferProvider.java
@@ -37,7 +37,7 @@
}
@Override
- public void reset(ColumnBTreeReadLeafFrame frame) {
+ public void reset(ColumnBTreeReadLeafFrame frame) throws HyracksDataException {
int offset = frame.getColumnOffset(columnIndex);
this.buffer = frame.getBuffer().duplicate();
buffer.position(offset);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index a77ccdf..8169fb9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -500,7 +500,8 @@
for (ICachedPageInternal internalPage : cachedPages) {
CachedPage c = (CachedPage) internalPage;
if (c != null) {
- if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) {
+ if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0
+ || c.pinCount.get() != 0) {
return false;
}
if (c.valid) {