[ASTERIXDB-3484] Support storing large values in metadata pages
- user model changes: no
- storage format changes: yes
- interface changes: yes
Details:
Add the support to store large key-value pairs
in the component's metadata page
Ext-ref: MB-62875
Change-Id: Ie3f5cb6820b98ef78841c49b13ae33436c71a99a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18633
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.001.ddl.sqlpp
new file mode 100644
index 0000000..71a618b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.001.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE DATASET ColumnDataset
+PRIMARY KEY (id: int) WITH {
+ "storage-format": {"format" : "column"}
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.002.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.002.update.sqlpp
new file mode 100644
index 0000000..5adbda4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.002.update.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+USE test;
+
+-- This will add 5000 columns
+UPSERT INTO ColumnDataset (
+ SELECT VALUE object_add_fields({"id": x},
+ [{"field-name": "myBadLongGeneratedFieldName" || to_string(x), "field-value":x}])
+ FROM RANGE(1, 5000) x
+)
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.003.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.003.query.sqlpp
new file mode 100644
index 0000000..c7fd8b5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.003.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+-- This will read a large schema (chunked into two pieces given that the page size is 32KB)
+SELECT VALUE COUNT(*)
+FROM ColumnDataset c
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.004.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.004.query.sqlpp
new file mode 100644
index 0000000..cda8ed1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.004.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+-- Ensure values can be projected with large schemas
+SELECT VALUE c.myBadLongGeneratedFieldName751
+FROM ColumnDataset c
+WHERE c.myBadLongGeneratedFieldName751 IS NOT UNKNOWN
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.003.adm
new file mode 100644
index 0000000..e9c02da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.003.adm
@@ -0,0 +1 @@
+5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.004.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.004.adm
new file mode 100644
index 0000000..29988c8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.004.adm
@@ -0,0 +1 @@
+751
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index 2aba758..509d90b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -16599,6 +16599,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="column">
+ <compilation-unit name="large-schema">
+ <output-dir compare="Text">large-schema</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="column">
<compilation-unit name="validation">
<output-dir compare="Text">validation</output-dir>
<expected-error>ASX1191: Merge policy 'correlated-prefix' is not supported with columnar storage format</expected-error>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
index 3a7e901..7129897 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
@@ -24,41 +24,60 @@
/**
* An API for block compressor/decompressor.
- *
+ * <p>
* Note: Should never allocate any buffer in compress/uncompress operations and it must be stateless to be thread safe.
*/
public interface ICompressorDecompressor {
/**
* Computes the required buffer size for <i>compress()</i>.
*
- * @param uBufferSize
- * The size of the uncompressed buffer.
+ * @param uBufferSize The size of the uncompressed buffer.
* @return The required buffer size for compression
*/
int computeCompressedBufferSize(int uBufferSize);
/**
+ * Compress <i>src</i> into <i>dest</i>
+ *
+ * @param src Uncompressed source buffer
+ * @param srcOffset Source offset
+ * @param srcLen Source length
+ * @param dest Destination buffer
+ * @param destOffset Destination offset
+ * @return compressed length
+ */
+ int compress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) throws HyracksDataException;
+
+ /**
* Compress <i>uBuffer</i> into <i>cBuffer</i>
*
- * @param uBuffer
- * Uncompressed source buffer
- * @param cBuffer
- * Compressed destination buffer
+ * @param uBuffer Uncompressed source buffer
+ * @param cBuffer Compressed destination buffer
* @return Buffer after compression. ({@link ByteBuffer#limit()} is set to the compressed size
- * @throws HyracksDataException
*/
ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException;
/**
+ * Uncompress <i>src</i> into <i>dest</i>
+ *
+ * @param src Compressed source
+ * @param srcOffset Source offset
+ * @param srcLen Source length
+ * @param dest Destination buffer
+ * @param destOffset Destination offset
+ * @return uncompressed length
+ * @throws HyracksDataException An exception will be thrown if the <i>uBuffer</i> size is not sufficient.
+ */
+ int uncompress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) throws HyracksDataException;
+
+ /**
* Uncompress <i>cBuffer</i> into <i>uBuffer</i>
*
- * @param cBuffer
- * Compressed source buffer
- * @param uBuffer
- * Uncompressed destination buffer
+ * @param cBuffer Compressed source buffer
+ * @param uBuffer Uncompressed destination buffer
* @return Buffer after decompression. ({@link ByteBuffer#limit()} is set to the uncompressed size
- * @throws HyracksDataException
- * An exception will be thrown if the <i>uBuffer</i> size is not sufficient.
+ * @throws HyracksDataException An exception will be thrown if the <i>uBuffer</i> size is not sufficient.
*/
ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException;
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java
index 58c837b..7909ed1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java
@@ -29,6 +29,7 @@
public interface IMetadataPageManager extends IPageManager {
/**
* put the key value pair in the metadata page using the passed frame
+ *
* @param frame
* @param key
* @param value
@@ -38,19 +39,22 @@
/**
* get the value of the key from the metadata page using the passed frame
+ *
* @param frame
* @param key
* @param value
+ * @return true if the key exists, false otherwise
* @throws HyracksDataException
*/
- void get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException;
+ boolean get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException;
/**
- * @param frame
- * @param key
- * @return The byte offset in the index file for the entry with the passed key if the index is valid and the key
- * exists, returns -1 otherwise. use the passed frame to read the metadata page
- * @throws HyracksDataException
+ * @return page size
*/
- long getFileOffset(ITreeIndexMetadataFrame frame, IValueReference key) throws HyracksDataException;
+ int getPageSize();
+
+ /**
+ * @return free space of the current page
+ */
+ int getFreeSpace() throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java
index 7efc469..a898d59 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java
@@ -36,6 +36,7 @@
/**
* Set the page in the frame
+ *
* @param page
*/
void setPage(ICachedPage page);
@@ -53,6 +54,7 @@
/**
* Set the page level
+ *
* @param level
*/
void setLevel(byte level);
@@ -60,12 +62,14 @@
/**
* Get the next metadata page if this page is linked to other metadata pages
* Return a negative value otherwise
+ *
* @return
*/
int getNextMetadataPage();
/**
* Link this metadata page to another one
+ *
* @param nextPage
*/
void setNextMetadataPage(int nextPage);
@@ -77,37 +81,44 @@
/**
* Set the max page of the file
+ *
* @param maxPage
*/
void setMaxPage(int maxPage);
/**
* Get a free page from the page
+ *
* @return
*/
int getFreePage();
/**
* Get the remaining space in the metadata page
+ *
* @return
*/
int getSpace();
/**
* add a new free page to the metadata page
+ *
* @param freePage
*/
void addFreePage(int freePage);
/**
* get the value with the key = key
+ *
* @param key
* @param value
+ * @return true if the key exists, false otherwise
*/
- void get(IValueReference key, IPointable value);
+ boolean get(IValueReference key, IPointable value);
/**
* set the value with the key = key
+ *
* @param key
* @param value
* @throws HyracksDataException
@@ -121,18 +132,21 @@
/**
* Sets the index to be valid in the metadata page
+ *
* @param valid
*/
void setValid(boolean valid);
/**
* Get the storage version associated with this index
+ *
* @return
*/
int getVersion();
/**
* Set the index root page id
+ *
* @param rootPage
*/
void setRootPageId(int rootPage);
@@ -149,6 +163,7 @@
/**
* return the offset to the entry of the passed key, -1, otherwise
+ *
* @param key
*/
int getOffset(IValueReference key);
@@ -162,4 +177,9 @@
* @return true if the inspected page is a free page, false otherwise
*/
boolean isFreePage();
+
+ /**
+ * @return the overhead (in bytes) to store a key-value pair
+ */
+ int getKeyValueStorageOverhead();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
index d0757c8..62251d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
@@ -36,7 +36,6 @@
* ....
* ....
* [free page 5][free page 4][free page 3][free page 2][free page 1]
- *
*/
public class LIFOMetaDataFrame implements ITreeIndexMetadataFrame {
@@ -167,17 +166,22 @@
}
@Override
- public void get(IValueReference key, IPointable value) {
+ public int getKeyValueStorageOverhead() {
+ return Integer.BYTES * 2;
+ }
+
+ @Override
+ public boolean get(IValueReference key, IPointable value) {
int tupleCount = getTupleCount();
int tupleStart = getTupleStart(0);
for (int i = 0; i < tupleCount; i++) {
if (isInner(key, tupleStart)) {
get(tupleStart + key.getLength() + Integer.BYTES, value);
- return;
+ return true;
}
tupleStart = getNextTupleStart(tupleStart);
}
- value.set(null, 0, 0);
+ return false;
}
private int find(IValueReference key) {
@@ -197,7 +201,7 @@
value.set(buf.array(), offset + Integer.BYTES, valueLength);
}
- private static final int compare(byte[] b1, int s1, byte[] b2, int s2, int l) {
+ private static int compare(byte[] b1, int s1, byte[] b2, int s2, int l) {
for (int i = 0; i < l; i++) {
if (b1[s1 + i] != b2[s2 + i]) {
return b1[s1 + i] - b2[s2 + i];
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index 852c8b5..dae01bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -18,6 +18,9 @@
*/
package org.apache.hyracks.storage.am.common.freepage;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
@@ -40,108 +43,36 @@
public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager {
private final IBufferCache bufferCache;
+ private final ITreeIndexMetadataFrameFactory frameFactory;
+ private final List<ICachedPage> metadataPages;
private int metadataPage = IBufferCache.INVALID_PAGEID;
private int fileId = -1;
- private final ITreeIndexMetadataFrameFactory frameFactory;
- private ICachedPage confiscatedPage;
+ private ICachedPage currentPage;
+ private ICachedPage firstPage;
private boolean ready = false;
public AppendOnlyLinkedMetadataPageManager(IBufferCache bufferCache, ITreeIndexMetadataFrameFactory frameFactory) {
this.bufferCache = bufferCache;
this.frameFactory = frameFactory;
+ metadataPages = new ArrayList<>();
}
@Override
- public void releasePage(ITreeIndexMetadataFrame metaFrame, int freePageNum) throws HyracksDataException {
- ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId()));
- metaPage.acquireWriteLatch();
- try {
- metaFrame.setPage(metaPage);
- if (metaFrame.getSpace() > Integer.BYTES) {
- metaFrame.addFreePage(freePageNum);
- } else {
- int newPageNum = metaFrame.getFreePage();
- if (newPageNum < 0) {
- throw new HyracksDataException(
- "Inconsistent Meta Page State. It has no space, but it also has no entries.");
- }
- ICachedPage newNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, newPageNum));
- newNode.acquireWriteLatch();
- try {
- int metaMaxPage = metaFrame.getMaxPage();
- System.arraycopy(metaPage.getBuffer().array(), 0, newNode.getBuffer().array(), 0,
- metaPage.getBuffer().capacity());
- metaFrame.init();
- metaFrame.setNextMetadataPage(newPageNum);
- metaFrame.setMaxPage(metaMaxPage);
- metaFrame.addFreePage(freePageNum);
- } finally {
- newNode.releaseWriteLatch(true);
- bufferCache.unpin(newNode);
- }
- }
- } finally {
- metaPage.releaseWriteLatch(true);
- bufferCache.unpin(metaPage);
- }
+ public void releasePage(ITreeIndexMetadataFrame metaFrame, int freePageNum) {
+ throw new IllegalAccessError("On-disk pages must be immutable");
}
@Override
- public void releaseBlock(ITreeIndexMetadataFrame metaFrame, int startingPage, int count)
- throws HyracksDataException {
- for (int i = 0; i < count; i++) {
- releasePage(metaFrame, startingPage + i);
- }
+ public void releaseBlock(ITreeIndexMetadataFrame metaFrame, int startingPage, int count) {
+ throw new IllegalAccessError("On-disk pages must be immutable");
}
@Override
public int takePage(ITreeIndexMetadataFrame metaFrame) throws HyracksDataException {
- confiscatedPage.acquireWriteLatch();
- int freePage = IBufferCache.INVALID_PAGEID;
- try {
- metaFrame.setPage(confiscatedPage);
- freePage = metaFrame.getFreePage();
- if (freePage < 0) { // no free page entry on this page
- int nextPage = metaFrame.getNextMetadataPage();
- if (nextPage > 0) { // sibling may have free pages
- ICachedPage nextNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, nextPage));
- nextNode.acquireWriteLatch();
- // we copy over the free space entries of nextpage into the
- // first meta page (metaDataPage)
- // we need to link the first page properly to the next page
- // of nextpage
- try {
- // remember entries that remain unchanged
- int maxPage = metaFrame.getMaxPage();
- // copy entire page (including sibling pointer, free
- // page entries, and all other info)
- // after this copy nextPage is considered a free page
- System.arraycopy(nextNode.getBuffer().array(), 0, confiscatedPage.getBuffer().array(), 0,
- nextNode.getBuffer().capacity());
- // reset unchanged entry
- metaFrame.setMaxPage(maxPage);
- freePage = metaFrame.getFreePage();
- // sibling also has no free pages, this "should" not
- // happen, but we deal with it anyway just to be safe
- if (freePage < 0) {
- freePage = nextPage;
- } else {
- metaFrame.addFreePage(nextPage);
- }
- } finally {
- nextNode.releaseWriteLatch(true);
- bufferCache.unpin(nextNode);
- }
- } else {
- freePage = metaFrame.getMaxPage();
- freePage++;
- metaFrame.setMaxPage(freePage);
- }
- }
- } finally {
- confiscatedPage.releaseWriteLatch(false);
- }
- return freePage;
+ metaFrame.setPage(firstPage);
+ int maxPage = metaFrame.getMaxPage() + 1;
+ metaFrame.setMaxPage(maxPage);
+ return maxPage;
}
@Override
@@ -154,23 +85,22 @@
@Override
public int getMaxPageId(ITreeIndexMetadataFrame metaFrame) throws HyracksDataException {
ICachedPage metaNode;
- if (confiscatedPage == null) {
+ if (firstPage == null) {
int mdPage = getMetadataPageId();
if (mdPage < 0) {
return IBufferCache.INVALID_PAGEID;
}
metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, mdPage));
} else {
- metaNode = confiscatedPage;
+ metaNode = firstPage;
}
- metaNode.acquireReadLatch();
+
int maxPage = -1;
try {
metaFrame.setPage(metaNode);
maxPage = metaFrame.getMaxPage();
} finally {
- metaNode.releaseReadLatch();
- if (confiscatedPage == null) {
+ if (firstPage == null) {
bufferCache.unpin(metaNode);
}
}
@@ -195,48 +125,31 @@
int pages = bufferCache.getNumPagesOfFile(fileId);
//if there are no pages in the file yet, we're just initializing
if (pages == 0) {
- if (confiscatedPage != null) {
+ if (firstPage != null) {
throw new HyracksDataException("Metadata Page Manager is already initialized");
}
ITreeIndexMetadataFrame metaFrame = createMetadataFrame();
- ICachedPage metaNode = bufferCache.confiscatePage(BufferCache.INVALID_DPID);
- try {
- metaFrame.setPage(metaNode);
- metaFrame.init();
- metaFrame.setMaxPage(-1);
- } finally {
- confiscatedPage = metaNode;
- }
+ // First to confiscate
+ confiscateNext(metaFrame);
+ firstPage = currentPage;
+ metaFrame.setMaxPage(-1);
}
}
@Override
public void close(IPageWriteFailureCallback failureCallback) throws HyracksDataException {
if (ready) {
- IFIFOPageWriter pageWriter = bufferCache.createFIFOWriter(NoOpPageWriteCallback.INSTANCE, failureCallback,
- DefaultBufferCacheWriteContext.INSTANCE);
- ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame();
- confiscatedPage.acquireWriteLatch();
- try {
- metaFrame.setPage(confiscatedPage);
- metaFrame.setValid(true);
- } finally {
- confiscatedPage.releaseWriteLatch(false);
- }
- int finalMetaPage = getMaxPageId(metaFrame) + 1;
- confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
- final ICompressedPageWriter compressedPageWriter = bufferCache.getCompressedPageWriter(fileId);
- compressedPageWriter.prepareWrite(confiscatedPage);
- // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
- // won't be flushed to disk because it won't be dirty until the write latch has been released.
- pageWriter.write(confiscatedPage);
- compressedPageWriter.endWriting();
+ persist(failureCallback);
metadataPage = getMetadataPageId();
ready = false;
- } else if (confiscatedPage != null) {
- bufferCache.returnPage(confiscatedPage, false);
+ } else if (!metadataPages.isEmpty()) {
+ for (ICachedPage page : metadataPages) {
+ bufferCache.returnPage(page, false);
+ }
}
- confiscatedPage = null;
+ currentPage = null;
+ firstPage = null;
+ metadataPages.clear();
}
/**
@@ -270,32 +183,26 @@
@Override
public void setRootPageId(int rootPage) throws HyracksDataException {
ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame();
- confiscatedPage.acquireWriteLatch();
- try {
- metaFrame.setPage(confiscatedPage);
- metaFrame.setRootPageId(rootPage);
- } finally {
- confiscatedPage.releaseWriteLatch(false);
- }
+ metaFrame.setPage(firstPage);
+ metaFrame.setRootPageId(rootPage);
ready = true;
}
@Override
public int getRootPageId() throws HyracksDataException {
ICachedPage metaNode;
- if (confiscatedPage == null) {
+ if (firstPage == null) {
metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId()));
} else {
- metaNode = confiscatedPage;
+ metaNode = firstPage;
}
ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame();
- metaNode.acquireReadLatch();
+
try {
metaFrame.setPage(metaNode);
return metaFrame.getRootPageId();
} finally {
- metaNode.releaseReadLatch();
- if (confiscatedPage == null) {
+ if (firstPage == null) {
bufferCache.unpin(metaNode);
}
}
@@ -309,58 +216,122 @@
@Override
public void put(ITreeIndexMetadataFrame frame, IValueReference key, IValueReference value)
throws HyracksDataException {
- if (confiscatedPage == null) {
+ if (currentPage == null) {
throw HyracksDataException.create(ErrorCode.ILLEGAL_WRITE_AFTER_FLUSH_ATTEMPT);
}
- confiscatedPage.acquireWriteLatch();
- try {
- frame.setPage(confiscatedPage);
- frame.put(key, value);
- } finally {
- confiscatedPage.releaseWriteLatch(false);
+
+ frame.setPage(currentPage);
+
+ if (frame.getSpace() < key.getLength() + value.getLength() + frame.getKeyValueStorageOverhead()) {
+ // If there's no space, confiscate an extra page
+ confiscateNext(frame);
+ }
+
+ frame.put(key, value);
+ if (frame.getSpace() == 0) {
+ /*
+ * Most likely a user is writing chunks, confiscate a new page so the next call to
+ * getFreeSpace() will not return 0.
+ */
+ confiscateNext(frame);
}
}
- private ICachedPage pinPage() throws HyracksDataException {
- return confiscatedPage == null ? bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId()))
- : confiscatedPage;
+ @Override
+ public boolean get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value)
+ throws HyracksDataException {
+ int nextPage = getNextPageId(frame, -1);
+ while (nextPage != -1) {
+ ICachedPage page = pinPage(nextPage);
+ try {
+ frame.setPage(page);
+ if (frame.get(key, value)) {
+ return true;
+ }
+ nextPage = getNextPageId(frame, nextPage);
+ } finally {
+ unpinPage(page);
+ }
+ }
+
+ // To preserve the old behavior
+ value.set(null, 0, 0);
+ return false;
+ }
+
+ @Override
+ public int getPageSize() {
+ return bufferCache.getPageSize();
+ }
+
+ @Override
+ public int getFreeSpace() throws HyracksDataException {
+ if (currentPage == null) {
+ throw HyracksDataException.create(ErrorCode.ILLEGAL_WRITE_AFTER_FLUSH_ATTEMPT);
+ }
+ ITreeIndexMetadataFrame frame = createMetadataFrame();
+ frame.setPage(currentPage);
+ return frame.getSpace() - frame.getKeyValueStorageOverhead();
+ }
+
+ private int getNextPageId(ITreeIndexMetadataFrame frame, int previousPageIdx) throws HyracksDataException {
+ if (metadataPages.isEmpty()) {
+ // Read-only (immutable)
+ return previousPageIdx == -1 ? getMetadataPageId() : frame.getNextMetadataPage();
+ }
+
+ // Write (still mutable)
+ int nextPageIdx = previousPageIdx + 1;
+ return nextPageIdx < metadataPages.size() ? nextPageIdx : -1;
+ }
+
+ private ICachedPage pinPage(int pageId) throws HyracksDataException {
+ if (!metadataPages.isEmpty()) {
+ return metadataPages.get(pageId);
+ }
+
+ return bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId));
}
private void unpinPage(ICachedPage page) throws HyracksDataException {
- if (confiscatedPage == null) {
+ if (metadataPages.isEmpty()) {
bufferCache.unpin(page);
}
}
- @Override
- public void get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException {
- ICachedPage page = pinPage();
- page.acquireReadLatch();
+ private void confiscateNext(ITreeIndexMetadataFrame metaFrame) throws HyracksDataException {
+ ICachedPage metaNode = bufferCache.confiscatePage(BufferCache.INVALID_DPID);
try {
- frame.setPage(page);
- frame.get(key, value);
+ metaFrame.setPage(metaNode);
+ metaFrame.init();
} finally {
- page.releaseReadLatch();
- unpinPage(page);
+ metadataPages.add(metaNode);
+ currentPage = metaNode;
}
}
- @Override
- public long getFileOffset(ITreeIndexMetadataFrame frame, IValueReference key) throws HyracksDataException {
- int pageId = getMetadataPageId();
- if (pageId != IBufferCache.INVALID_PAGEID) {
- ICachedPage page = pinPage();
- page.acquireReadLatch();
- try {
- frame.setPage(page);
- int inPageOffset = frame.getOffset(key);
- return inPageOffset >= 0 ? ((long) pageId * bufferCache.getPageSizeWithHeader()) + frame.getOffset(key)
- + IBufferCache.RESERVED_HEADER_BYTES : -1L;
- } finally {
- page.releaseReadLatch();
- unpinPage(page);
- }
+ private void persist(IPageWriteFailureCallback failureCallback) throws HyracksDataException {
+ IFIFOPageWriter pageWriter = bufferCache.createFIFOWriter(NoOpPageWriteCallback.INSTANCE, failureCallback,
+ DefaultBufferCacheWriteContext.INSTANCE);
+ ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame();
+ // Last page will have nextPage as -1
+ int nextPage = -1;
+ int pageId = getMaxPageId(metaFrame) + 1;
+ final ICompressedPageWriter compressedPageWriter = bufferCache.getCompressedPageWriter(fileId);
+
+ // Write pages in reverse order (first confiscated page will be the last one to be written)
+ for (int i = metadataPages.size() - 1; i >= 0; i--) {
+ ICachedPage page = metadataPages.get(i);
+ metaFrame.setPage(page);
+ metaFrame.setNextMetadataPage(nextPage);
+ // The validity bit matters in the last written page only. No harm for setting this flag for all pages.
+ metaFrame.setValid(true);
+
+ page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, pageId));
+ compressedPageWriter.prepareWrite(page);
+ pageWriter.write(page);
+ nextPage = pageId++;
}
- return -1L;
+ compressedPageWriter.endWriting();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index edc6d10..fbb6b5f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -319,24 +319,18 @@
}
@Override
- public void get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException {
+ public boolean get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value)
+ throws HyracksDataException {
throw new HyracksDataException("Unsupported Operation");
}
@Override
- public long getFileOffset(ITreeIndexMetadataFrame frame, IValueReference key) throws HyracksDataException {
- int metadataPageNum = getMetadataPageId();
- if (metadataPageNum != IBufferCache.INVALID_PAGEID) {
- ICachedPage metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId()));
- metaNode.acquireReadLatch();
- try {
- frame.setPage(metaNode);
- return ((long) metadataPageNum * bufferCache.getPageSizeWithHeader()) + frame.getOffset(key);
- } finally {
- metaNode.releaseReadLatch();
- bufferCache.unpin(metaNode);
- }
- }
- return -1;
+ public int getPageSize() {
+ return bufferCache.getPageSize();
+ }
+
+ @Override
+ public int getFreeSpace() throws HyracksDataException {
+ throw new HyracksDataException("Unsupported Operation");
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnMetadataReaderWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnMetadataReaderWriter.java
new file mode 100644
index 0000000..39a6dd2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnMetadataReaderWriter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.utils;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree;
+import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
+import org.apache.hyracks.storage.common.compression.SnappyCompressorDecompressor;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * A Reader/Writer for {@link IColumnMetadata}
+ */
+@ThreadSafe
+final class ColumnMetadataReaderWriter {
+ private static final Logger LOGGER = LogManager.getLogger();
+ /**
+ * The header consists of two integers: [originalLength | compressedLength]
+ */
+ private static final int CHUNK_HEADER_SIZE = Integer.BYTES * 2;
+ /**
+ * Used to get the columns info from {@link IComponentMetadata#get(IValueReference, ArrayBackedValueStorage)}
+ *
+ * @see LSMColumnBTree#activate()
+ * @see IColumnManager#activate(IValueReference)
+ */
+ private static final MutableArrayValueReference COLUMNS_METADATA_KEY =
+ new MutableArrayValueReference("COLUMNS_METADATA".getBytes());
+
+ /**
+ * The default (and only) compressor today is 'snappy'. In the future, this could be changed.
+ * Old indexes should still use snappy. But new indexes can take whatever {@link ICompressorDecompressor} passed
+ * to it.
+ */
+ private final ICompressorDecompressor compressorDecompressor;
+
+ /**
+ * This is currently {@link ThreadSafe} since {@link SnappyCompressorDecompressor#INSTANCE} is thread safe. If the
+ * {@link ICompressorDecompressor} is modified or changed, the modifier should ensure that either the new
+ * {@link ICompressorDecompressor} is thread safe or the users of this class should create their own instances.
+ */
+ public ColumnMetadataReaderWriter() {
+ compressorDecompressor = SnappyCompressorDecompressor.INSTANCE;
+ }
+
+ /**
+ * Writes the metadata. If the metadata is 'large', then it will be compressed and stored in chunks
+ *
+ * @param metadata to write
+ * @param componentMetadata to store the metadata at
+ */
+ public void writeMetadata(IValueReference metadata, IComponentMetadata componentMetadata)
+ throws HyracksDataException {
+ int requiredLength = COLUMNS_METADATA_KEY.getLength() + metadata.getLength();
+ if (componentMetadata.getAvailableSpace() >= requiredLength) {
+ componentMetadata.put(COLUMNS_METADATA_KEY, metadata);
+ } else {
+ LOGGER.debug("Writing large column metadata of size {} bytes", requiredLength);
+ writeChunks(metadata, componentMetadata);
+ }
+ }
+
+ /**
+ * Read the metadata. If the metadata is chunked, it will be assembled back to its original form
+ *
+ * @param componentMetadata source
+ * @return read metadata
+ */
+ public IValueReference readMetadata(IComponentMetadata componentMetadata) throws HyracksDataException {
+ ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
+ storage.reset();
+
+ if (!componentMetadata.get(COLUMNS_METADATA_KEY, storage)) {
+ readChunks(componentMetadata, storage);
+ }
+
+ return storage;
+ }
+
+ private void writeChunks(IValueReference metadata, IComponentMetadata componentMetadata)
+ throws HyracksDataException {
+ ArrayBackedValueStorage key = new ArrayBackedValueStorage(COLUMNS_METADATA_KEY.getLength() + Integer.BYTES);
+ int originalLength = metadata.getLength();
+
+ int requiredSize = compressorDecompressor.computeCompressedBufferSize(originalLength);
+ ArrayBackedValueStorage compressed = new ArrayBackedValueStorage(requiredSize + CHUNK_HEADER_SIZE);
+
+ // Write the compressed content after CHUNK_HEADER_SIZE
+ int compressedLength = compressorDecompressor.compress(metadata.getByteArray(), 0, originalLength,
+ compressed.getByteArray(), CHUNK_HEADER_SIZE);
+ // Set the size to be the header size + compressedLength
+ compressed.setSize(CHUNK_HEADER_SIZE + compressedLength);
+ // Serialize the original length
+ IntegerPointable.setInteger(compressed.getByteArray(), 0, originalLength);
+ // Serialize the compressed length
+ IntegerPointable.setInteger(compressed.getByteArray(), Integer.BYTES, compressedLength);
+
+ // Write chunks
+ VoidPointable chunk = new VoidPointable();
+ int position = 0;
+ int chunkId = 0;
+ int keyLength = COLUMNS_METADATA_KEY.getLength() + Integer.BYTES;
+ int totalLength = compressed.getLength();
+ while (position < totalLength) {
+ int remaining = totalLength - position;
+ int freeSpace = componentMetadata.getAvailableSpace() - keyLength;
+ // Find the largest chunk size that can be written
+ int chunkLength = Math.min(remaining, freeSpace);
+ // Prepare a chunk
+ chunk.set(compressed.getByteArray(), position, chunkLength);
+ // Write a chunk
+ componentMetadata.put(getChunkKey(chunkId++, key), chunk);
+ position += chunkLength;
+ }
+ }
+
+ private void readChunks(IComponentMetadata componentMetadata, ArrayBackedValueStorage chunk)
+ throws HyracksDataException {
+ ArrayBackedValueStorage key = new ArrayBackedValueStorage(COLUMNS_METADATA_KEY.getLength() + Integer.BYTES);
+ ArrayBackedValueStorage compressed = new ArrayBackedValueStorage();
+ // Ensure large buffer to avoid enlarging the storage multiple times
+ chunk.setSize(componentMetadata.getPageSize());
+
+ int chunkId = 0;
+ // Read the header + the first chunk
+ chunk.reset();
+ componentMetadata.get(getChunkKey(chunkId++, key), chunk);
+ int originalLength = IntegerPointable.getInteger(chunk.getByteArray(), 0);
+ int compressedLength = IntegerPointable.getInteger(chunk.getByteArray(), Integer.BYTES);
+ // Append the first chunk without the header
+ compressed.append(chunk.getByteArray(), CHUNK_HEADER_SIZE, chunk.getLength() - CHUNK_HEADER_SIZE);
+ // Read the remaining chunks
+ int remainingLength = compressedLength - compressed.getLength();
+ while (remainingLength > 0) {
+ chunk.reset();
+ // Get the next chunk
+ componentMetadata.get(getChunkKey(chunkId++, key), chunk);
+ // Append the next chunk
+ compressed.append(chunk);
+ remainingLength -= chunk.getLength();
+ }
+
+ // Decompress 'compressed'
+ int requiredSize = compressorDecompressor.computeCompressedBufferSize(originalLength);
+ // Ensure the size
+ chunk.setSize(requiredSize);
+ int uncompressedLength = compressorDecompressor.uncompress(compressed.getByteArray(), 0, compressedLength,
+ chunk.getByteArray(), 0);
+ if (uncompressedLength != originalLength) {
+ throw new IllegalStateException("Uncompressed size mismatch (original: " + originalLength
+ + ", uncompressed: " + uncompressedLength + ")");
+ }
+
+ // Set the original length
+ chunk.setSize(originalLength);
+ }
+
+ private static IValueReference getChunkKey(int chunkId, ArrayBackedValueStorage storage)
+ throws HyracksDataException {
+ if (chunkId == 0) {
+ // First chunk. Append the key prefix + set the size
+ storage.reset();
+ storage.append(COLUMNS_METADATA_KEY);
+ storage.setSize(COLUMNS_METADATA_KEY.getLength() + Integer.BYTES);
+ }
+
+ IntegerPointable.setInteger(storage.getByteArray(), COLUMNS_METADATA_KEY.getLength(), chunkId);
+ return storage;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
index fc1e460..95edced 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
@@ -23,36 +23,24 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
-import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
-import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
public class ColumnUtil {
- /**
- * Used to get the columns info from {@link IComponentMetadata#get(IValueReference, ArrayBackedValueStorage)}
- *
- * @see LSMColumnBTree#activate()
- * @see IColumnManager#activate(IValueReference)
- */
- private static final MutableArrayValueReference COLUMNS_METADATA_KEY =
- new MutableArrayValueReference("COLUMNS_METADATA".getBytes());
+ // Currently, ColumnMetadataReaderWriter is thread safe as the snappy compressor/decompressor is thread safe
+ private static final ColumnMetadataReaderWriter READER_WRITER = new ColumnMetadataReaderWriter();
private ColumnUtil() {
}
public static IValueReference getColumnMetadataCopy(IComponentMetadata src) throws HyracksDataException {
- ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
- src.get(COLUMNS_METADATA_KEY, storage);
- return storage;
+ return READER_WRITER.readMetadata(src);
}
public static void putColumnsMetadataValue(IValueReference columnsMetadataValue, IComponentMetadata dest)
throws HyracksDataException {
- dest.put(COLUMNS_METADATA_KEY, columnsMetadataValue);
+ READER_WRITER.writeMetadata(columnsMetadataValue, dest);
}
public static int getColumnPageIndex(int columnOffset, int pageSize) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java
index fa69d7a..8cdc064 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java
@@ -23,23 +23,31 @@
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
public interface IComponentMetadata {
+ /**
+ * @return page size
+ */
+ int getPageSize();
+
+ /**
+ * @return the available space to store a value
+ */
+ int getAvailableSpace() throws HyracksDataException;
/**
* Put the key value pair in this metadata, overwrite if it exists
*
* @param key
* @param value
- * @throws HyracksDataException
- * if the component is immutable
+ * @throws HyracksDataException if the component is immutable
*/
void put(IValueReference key, IValueReference value) throws HyracksDataException;
/**
* Get the value of the key from the metadata, 0 length value if not exists
*
- * @param key
- * @param value
- * @throws HyracksDataException
+ * @param key of the value
+ * @param storage storage used to store the retrieved value
+ * @return true if the key exists, false otherwise
*/
- void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException;
+ boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
index 649989c..bed72a5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
@@ -33,13 +33,23 @@
}
@Override
+ public int getPageSize() {
+ return mdpManager.getPageSize();
+ }
+
+ @Override
+ public int getAvailableSpace() throws HyracksDataException {
+ return mdpManager.getFreeSpace();
+ }
+
+ @Override
public void put(IValueReference key, IValueReference value) throws HyracksDataException {
mdpManager.put(mdpManager.createMetadataFrame(), key, value);
}
@Override
- public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException {
- mdpManager.get(mdpManager.createMetadataFrame(), key, value);
+ public boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException {
+ return mdpManager.get(mdpManager.createMetadataFrame(), key, storage);
}
public void put(MemoryComponentMetadata metadata) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java
index d0fe8a9..5314658 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java
@@ -35,7 +35,7 @@
}
@Override
- public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException {
+ public boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException {
throw new IllegalStateException("Attempt to read metadata of empty component");
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
index 0c2167f..b90b37c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
@@ -38,6 +38,16 @@
private final List<org.apache.commons.lang3.tuple.Pair<IValueReference, ArrayBackedValueStorage>> store =
new ArrayList<>();
+ @Override
+ public int getPageSize() {
+ return -1;
+ }
+
+ @Override
+ public int getAvailableSpace() throws HyracksDataException {
+ return Integer.MAX_VALUE;
+ }
+
/**
* Note: for memory metadata, it is expected that the key will be constant
*
@@ -64,14 +74,18 @@
* @throws HyracksDataException
*/
@Override
- public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException {
+ public boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException {
lock.readLock().lock();
try {
- value.reset();
+ storage.reset();
ArrayBackedValueStorage stored = get(key);
if (stored != null) {
- value.append(stored);
+ storage.append(stored);
+ return true;
}
+
+ // Key does not exist
+ return false;
} finally {
lock.readLock().unlock();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
index c4855bd..943b2b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
@@ -21,7 +21,6 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.api.compression.ICompressorDecompressor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
public class NoOpCompressorDecompressor implements ICompressorDecompressor {
public static final NoOpCompressorDecompressor INSTANCE = new NoOpCompressorDecompressor();
@@ -35,12 +34,24 @@
}
@Override
- public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
+ public int compress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) {
+ System.arraycopy(src, srcOffset, dest, destOffset, srcLen);
+ return srcLen;
+ }
+
+ @Override
+ public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) {
return uBuffer;
}
@Override
- public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
+ public int uncompress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) {
+ System.arraycopy(src, srcOffset, dest, destOffset, srcLen);
+ return srcLen;
+ }
+
+ @Override
+ public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) {
return cBuffer;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
index 16c9a2d..e3274ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
@@ -29,10 +29,9 @@
* Built-in Snappy compressor/decompressor wrapper
*/
public class SnappyCompressorDecompressor implements ICompressorDecompressor {
- protected static final SnappyCompressorDecompressor INSTANCE = new SnappyCompressorDecompressor();
+ public static final SnappyCompressorDecompressor INSTANCE = new SnappyCompressorDecompressor();
private SnappyCompressorDecompressor() {
-
}
@Override
@@ -41,6 +40,16 @@
}
@Override
+ public int compress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset)
+ throws HyracksDataException {
+ try {
+ return Snappy.compress(src, srcOffset, srcLen, dest, destOffset);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
try {
final int cLength = Snappy.compress(uBuffer.array(), uBuffer.position(), uBuffer.remaining(),
@@ -53,6 +62,16 @@
}
@Override
+ public int uncompress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset)
+ throws HyracksDataException {
+ try {
+ return Snappy.uncompress(src, srcOffset, srcLen, dest, destOffset);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
try {
final int uLength = Snappy.uncompress(cBuffer.array(), cBuffer.position(), cBuffer.remaining(),