Merge branch 'gerrit/neo'
Change-Id: Id22e3b96e31924a90c2cc7c7dee2aa828ba18ac6
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 78faaff..40b2f5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -42,7 +42,6 @@
import org.apache.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
import org.apache.hyracks.storage.am.common.api.IBTreeIndexTupleReference;
import org.apache.hyracks.storage.am.common.api.IPageManager;
-import org.apache.hyracks.storage.am.common.api.ISplitKey;
import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
@@ -51,7 +50,6 @@
import org.apache.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
import org.apache.hyracks.storage.am.common.impls.TreeIndexDiskOrderScanCursor;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -64,17 +62,13 @@
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
-import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
-import org.apache.hyracks.util.JSONUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
public class BTree extends AbstractTreeIndex {
public static final float DEFAULT_FILL_FACTOR = 0.7f;
@@ -85,7 +79,7 @@
private final AtomicInteger smoCounter;
private final ReadWriteLock treeLatch;
- private final int maxTupleSize;
+ protected final int maxTupleSize;
public BTree(IBufferCache bufferCache, IPageManager freePageManager, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount,
@@ -886,13 +880,13 @@
}
@Override
- public BTreeRangeSearchCursor createSearchCursor(boolean exclusive) {
+ public ITreeIndexCursor createSearchCursor(boolean exclusive) {
IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) btree.getLeafFrameFactory().createFrame();
return new BTreeRangeSearchCursor(leafFrame, exclusive, (IIndexCursorStats) iap.getParameters()
.getOrDefault(HyracksConstants.INDEX_CURSOR_STATS, NoOpIndexCursorStats.INSTANCE));
}
- public BTreeRangeSearchCursor createPointCursor(boolean exclusive, boolean stateful) {
+ public ITreeIndexCursor createPointCursor(boolean exclusive, boolean stateful) {
return createSearchCursor(exclusive);
}
@@ -1004,220 +998,7 @@
@Override
public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex, IPageWriteCallback callback) throws HyracksDataException {
- return new BTreeBulkLoader(fillFactor, verifyInput, callback);
- }
-
- public class BTreeBulkLoader extends AbstractTreeIndex.AbstractTreeIndexBulkLoader {
- protected final ISplitKey splitKey;
- protected final boolean verifyInput;
-
- public BTreeBulkLoader(float fillFactor, boolean verifyInput, IPageWriteCallback callback)
- throws HyracksDataException {
- super(fillFactor, callback);
- this.verifyInput = verifyInput;
- splitKey = new BTreeSplitKey(leafFrame.getTupleWriter().createTupleReference());
- splitKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
- }
-
- @Override
- public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- int tupleSize = Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple),
- interiorFrame.getBytesRequiredToWriteTuple(tuple));
- NodeFrontier leafFrontier = nodeFrontiers.get(0);
- int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize;
- int spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
-
- // try to free space by compression
- if (spaceUsed + spaceNeeded > leafMaxBytes) {
- leafFrame.compress();
- spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
- }
- //full, allocate new page
- if (spaceUsed + spaceNeeded > leafMaxBytes) {
- if (leafFrame.getTupleCount() == 0) {
- bufferCache.returnPage(leafFrontier.page, false);
- } else {
- leafFrontier.lastTuple.resetByTupleIndex(leafFrame, leafFrame.getTupleCount() - 1);
- if (verifyInput) {
- verifyInputTuple(tuple, leafFrontier.lastTuple);
- }
- int splitKeySize = tupleWriter.bytesRequired(leafFrontier.lastTuple, 0, cmp.getKeyFieldCount());
- splitKey.initData(splitKeySize);
- tupleWriter.writeTupleFields(leafFrontier.lastTuple, 0, cmp.getKeyFieldCount(),
- splitKey.getBuffer().array(), 0);
- splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
- splitKey.setLeftPage(leafFrontier.pageId);
-
- propagateBulk(1, pagesToWrite);
-
- leafFrontier.pageId = freePageManager.takePage(metaFrame);
-
- ((IBTreeLeafFrame) leafFrame).setNextLeaf(leafFrontier.pageId);
-
- write(leafFrontier.page);
- for (ICachedPage c : pagesToWrite) {
- write(c);
- }
- pagesToWrite.clear();
- splitKey.setRightPage(leafFrontier.pageId);
- }
- if (tupleSize > maxTupleSize) {
- final long dpid = BufferedFileHandle.getDiskPageId(getFileId(), leafFrontier.pageId);
- // calculate required number of pages.
- int headerSize = Math.max(leafFrame.getPageHeaderSize(), interiorFrame.getPageHeaderSize());
- final int multiplier =
- (int) Math.ceil((double) tupleSize / (bufferCache.getPageSize() - headerSize));
- if (multiplier > 1) {
- leafFrontier.page = bufferCache.confiscateLargePage(dpid, multiplier,
- freePageManager.takeBlock(metaFrame, multiplier - 1));
- } else {
- leafFrontier.page = bufferCache.confiscatePage(dpid);
- }
- leafFrame.setPage(leafFrontier.page);
- leafFrame.initBuffer((byte) 0);
- ((IBTreeLeafFrame) leafFrame).setLargeFlag(true);
- } else {
- final long dpid = BufferedFileHandle.getDiskPageId(getFileId(), leafFrontier.pageId);
- leafFrontier.page = bufferCache.confiscatePage(dpid);
- leafFrame.setPage(leafFrontier.page);
- leafFrame.initBuffer((byte) 0);
- }
- } else {
- if (verifyInput && leafFrame.getTupleCount() > 0) {
- leafFrontier.lastTuple.resetByTupleIndex(leafFrame, leafFrame.getTupleCount() - 1);
- verifyInputTuple(tuple, leafFrontier.lastTuple);
- }
- }
- ((IBTreeLeafFrame) leafFrame).insertSorted(tuple);
- } catch (HyracksDataException | RuntimeException e) {
- logState(tuple, e);
- handleException();
- throw e;
- }
- }
-
- protected void verifyInputTuple(ITupleReference tuple, ITupleReference prevTuple) throws HyracksDataException {
- // New tuple should be strictly greater than last tuple.
- int cmpResult = cmp.compare(tuple, prevTuple);
- if (cmpResult < 0) {
- throw HyracksDataException.create(ErrorCode.UNSORTED_LOAD_INPUT);
- }
- if (cmpResult == 0) {
- throw HyracksDataException.create(ErrorCode.DUPLICATE_LOAD_INPUT);
- }
- }
-
- protected void propagateBulk(int level, List<ICachedPage> pagesToWrite) throws HyracksDataException {
- if (splitKey.getBuffer() == null) {
- return;
- }
-
- if (level >= nodeFrontiers.size()) {
- addLevel();
- }
-
- NodeFrontier frontier = nodeFrontiers.get(level);
- interiorFrame.setPage(frontier.page);
-
- ITupleReference tuple = splitKey.getTuple();
- int tupleBytes = tupleWriter.bytesRequired(tuple, 0, cmp.getKeyFieldCount());
- int spaceNeeded = tupleBytes + slotSize + 4;
- if (tupleBytes > interiorFrame.getMaxTupleSize(BTree.this.bufferCache.getPageSize())) {
- throw HyracksDataException.create(ErrorCode.RECORD_IS_TOO_LARGE, tupleBytes,
- interiorFrame.getMaxTupleSize(BTree.this.bufferCache.getPageSize()));
- }
-
- int spaceUsed = interiorFrame.getBuffer().capacity() - interiorFrame.getTotalFreeSpace();
- if (spaceUsed + spaceNeeded > interiorMaxBytes) {
-
- ISplitKey copyKey = splitKey.duplicate(leafFrame.getTupleWriter().createTupleReference());
- tuple = copyKey.getTuple();
-
- frontier.lastTuple.resetByTupleIndex(interiorFrame, interiorFrame.getTupleCount() - 1);
- int splitKeySize = tupleWriter.bytesRequired(frontier.lastTuple, 0, cmp.getKeyFieldCount());
- splitKey.initData(splitKeySize);
- tupleWriter.writeTupleFields(frontier.lastTuple, 0, cmp.getKeyFieldCount(),
- splitKey.getBuffer().array(), 0);
- splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
-
- ((IBTreeInteriorFrame) interiorFrame).deleteGreatest();
- int finalPageId = freePageManager.takePage(metaFrame);
- frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
- pagesToWrite.add(frontier.page);
- splitKey.setLeftPage(finalPageId);
-
- propagateBulk(level + 1, pagesToWrite);
- frontier.page = bufferCache.confiscatePage(BufferCache.INVALID_DPID);
- interiorFrame.setPage(frontier.page);
- interiorFrame.initBuffer((byte) level);
- }
- ((IBTreeInteriorFrame) interiorFrame).insertSorted(tuple);
- }
-
- private void persistFrontiers(int level, int rightPage) throws HyracksDataException {
- if (level >= nodeFrontiers.size()) {
- rootPage = nodeFrontiers.get(level - 1).pageId;
- releasedLatches = true;
- return;
- }
- if (level < 1) {
- ICachedPage lastLeaf = nodeFrontiers.get(level).page;
- int lastLeafPage = nodeFrontiers.get(level).pageId;
- lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), nodeFrontiers.get(level).pageId));
- write(lastLeaf);
- nodeFrontiers.get(level).page = null;
- persistFrontiers(level + 1, lastLeafPage);
- return;
- }
- NodeFrontier frontier = nodeFrontiers.get(level);
- interiorFrame.setPage(frontier.page);
- //just finalize = the layer right above the leaves has correct righthand pointers already
- if (rightPage < 0) {
- throw new HyracksDataException(
- "Error in index creation. Internal node appears to have no rightmost guide");
- }
- ((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage);
- int finalPageId = freePageManager.takePage(metaFrame);
- frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
- write(frontier.page);
- frontier.pageId = finalPageId;
- persistFrontiers(level + 1, finalPageId);
- }
-
- @Override
- public void end() throws HyracksDataException {
- try {
- persistFrontiers(0, -1);
- super.end();
- } catch (HyracksDataException | RuntimeException e) {
- handleException();
- throw e;
- }
- }
-
- @Override
- public void abort() throws HyracksDataException {
- super.handleException();
- }
-
- private void logState(ITupleReference tuple, Exception e) {
- try {
- ObjectNode state = JSONUtil.createObject();
- state.set("leafFrame", leafFrame.getState());
- state.set("interiorFrame", interiorFrame.getState());
- int tupleSize = Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple),
- interiorFrame.getBytesRequiredToWriteTuple(tuple));
- state.put("tupleSize", tupleSize);
- state.put("spaceNeeded", tupleWriter.bytesRequired(tuple) + slotSize);
- state.put("spaceUsed", leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace());
- state.put("leafMaxBytes", leafMaxBytes);
- state.put("maxTupleSize", maxTupleSize);
- LOGGER.error("failed to add tuple {}", state, e);
- } catch (Throwable t) {
- e.addSuppressed(t);
- }
- }
+ return new BTreeNSMBulkLoader(fillFactor, verifyInput, callback, this);
}
@SuppressWarnings("rawtypes")
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeNSMBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeNSMBulkLoader.java
new file mode 100644
index 0000000..04c84e1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeNSMBulkLoader.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.btree.impls;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
+import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import org.apache.hyracks.storage.am.common.api.ISplitKey;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndexBulkLoader;
+import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.JSONUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class BTreeNSMBulkLoader extends AbstractTreeIndexBulkLoader {
+ private static final Logger LOGGER = LogManager.getLogger();
+ protected final ISplitKey splitKey;
+ protected final boolean verifyInput;
+ private final int maxTupleSize;
+
+ public BTreeNSMBulkLoader(float fillFactor, boolean verifyInput, IPageWriteCallback callback, ITreeIndex index)
+ throws HyracksDataException {
+ this(fillFactor, verifyInput, callback, index, index.getLeafFrameFactory().createFrame());
+ }
+
+ protected BTreeNSMBulkLoader(float fillFactor, boolean verifyInput, IPageWriteCallback callback, ITreeIndex index,
+ ITreeIndexFrame leafFrame) throws HyracksDataException {
+ super(fillFactor, callback, index, leafFrame);
+ this.verifyInput = verifyInput;
+ splitKey = new BTreeSplitKey(tupleWriter.createTupleReference());
+ splitKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
+ maxTupleSize = ((BTree) index).maxTupleSize;
+ }
+
+ @Override
+ public void add(ITupleReference tuple) throws HyracksDataException {
+ try {
+ int tupleSize = Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple),
+ interiorFrame.getBytesRequiredToWriteTuple(tuple));
+ NodeFrontier leafFrontier = nodeFrontiers.get(0);
+ int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize;
+ int spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
+
+ // try to free space by compression
+ if (spaceUsed + spaceNeeded > leafMaxBytes) {
+ leafFrame.compress();
+ spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
+ }
+ //full, allocate new page
+ if (spaceUsed + spaceNeeded > leafMaxBytes) {
+ if (leafFrame.getTupleCount() == 0) {
+ //The current page is empty. Return it.
+ bufferCache.returnPage(leafFrontier.page, false);
+ } else {
+ leafFrontier.lastTuple.resetByTupleIndex(leafFrame, leafFrame.getTupleCount() - 1);
+ if (verifyInput) {
+ verifyInputTuple(tuple, leafFrontier.lastTuple);
+ }
+ //The current page is not empty. Write it.
+ writeFullLeafPage();
+ }
+ if (tupleSize > maxTupleSize) {
+ //We need a large page
+ final long dpid = BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId);
+ // calculate required number of pages.
+ int headerSize = Math.max(leafFrame.getPageHeaderSize(), interiorFrame.getPageHeaderSize());
+ final int multiplier =
+ (int) Math.ceil((double) tupleSize / (bufferCache.getPageSize() - headerSize));
+ if (multiplier > 1) {
+ leafFrontier.page = bufferCache.confiscateLargePage(dpid, multiplier,
+ freePageManager.takeBlock(metaFrame, multiplier - 1));
+ } else {
+ leafFrontier.page = bufferCache.confiscatePage(dpid);
+ }
+ leafFrame.setPage(leafFrontier.page);
+ leafFrame.initBuffer((byte) 0);
+ ((IBTreeLeafFrame) leafFrame).setLargeFlag(true);
+ } else {
+ //allocate a new page
+ confiscateNewLeafPage();
+ }
+ } else {
+ if (verifyInput && leafFrame.getTupleCount() > 0) {
+ leafFrontier.lastTuple.resetByTupleIndex(leafFrame, leafFrame.getTupleCount() - 1);
+ verifyInputTuple(tuple, leafFrontier.lastTuple);
+ }
+ }
+ ((IBTreeLeafFrame) leafFrame).insertSorted(tuple);
+ } catch (HyracksDataException | RuntimeException e) {
+ logState(tuple, e);
+ handleException();
+ throw e;
+ }
+ }
+
+ protected void verifyInputTuple(ITupleReference tuple, ITupleReference prevTuple) throws HyracksDataException {
+ // New tuple should be strictly greater than last tuple.
+ int cmpResult = cmp.compare(tuple, prevTuple);
+ if (cmpResult < 0) {
+ throw HyracksDataException.create(ErrorCode.UNSORTED_LOAD_INPUT);
+ }
+ if (cmpResult == 0) {
+ throw HyracksDataException.create(ErrorCode.DUPLICATE_LOAD_INPUT);
+ }
+ }
+
+ protected void propagateBulk(int level, List<ICachedPage> pagesToWrite) throws HyracksDataException {
+ if (splitKey.getBuffer() == null) {
+ return;
+ }
+
+ if (level >= nodeFrontiers.size()) {
+ addLevel();
+ }
+
+ NodeFrontier frontier = nodeFrontiers.get(level);
+ interiorFrame.setPage(frontier.page);
+
+ ITupleReference tuple = splitKey.getTuple();
+ int tupleBytes = tupleWriter.bytesRequired(tuple, 0, cmp.getKeyFieldCount());
+ int spaceNeeded = tupleBytes + slotSize + 4;
+ if (tupleBytes > interiorFrame.getMaxTupleSize(bufferCache.getPageSize())) {
+ throw HyracksDataException.create(ErrorCode.RECORD_IS_TOO_LARGE, tupleBytes,
+ interiorFrame.getMaxTupleSize(bufferCache.getPageSize()));
+ }
+
+ int spaceUsed = interiorFrame.getBuffer().capacity() - interiorFrame.getTotalFreeSpace();
+ if (spaceUsed + spaceNeeded > interiorMaxBytes) {
+ ISplitKey copyKey = splitKey.duplicate(tupleWriter.createTupleReference());
+ tuple = copyKey.getTuple();
+
+ frontier.lastTuple.resetByTupleIndex(interiorFrame, interiorFrame.getTupleCount() - 1);
+ int splitKeySize = tupleWriter.bytesRequired(frontier.lastTuple, 0, cmp.getKeyFieldCount());
+ splitKey.initData(splitKeySize);
+ tupleWriter.writeTupleFields(frontier.lastTuple, 0, cmp.getKeyFieldCount(), splitKey.getBuffer().array(),
+ 0);
+ splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
+
+ ((IBTreeInteriorFrame) interiorFrame).deleteGreatest();
+ int finalPageId = freePageManager.takePage(metaFrame);
+ frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalPageId));
+ pagesToWrite.add(frontier.page);
+ splitKey.setLeftPage(finalPageId);
+
+ propagateBulk(level + 1, pagesToWrite);
+ frontier.page = bufferCache.confiscatePage(IBufferCache.INVALID_DPID);
+ interiorFrame.setPage(frontier.page);
+ interiorFrame.initBuffer((byte) level);
+ }
+ ((IBTreeInteriorFrame) interiorFrame).insertSorted(tuple);
+ }
+
+ private void persistFrontiers(int level, int rightPage) throws HyracksDataException {
+ if (level >= nodeFrontiers.size()) {
+ setRootPageId(nodeFrontiers.get(level - 1).pageId);
+ releasedLatches = true;
+ return;
+ }
+ if (level < 1) {
+ ICachedPage lastLeaf = nodeFrontiers.get(level).page;
+ int lastLeafPage = nodeFrontiers.get(level).pageId;
+ lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, nodeFrontiers.get(level).pageId));
+ writeLastLeaf(lastLeaf);
+ nodeFrontiers.get(level).page = null;
+ persistFrontiers(level + 1, lastLeafPage);
+ return;
+ }
+ NodeFrontier frontier = nodeFrontiers.get(level);
+ interiorFrame.setPage(frontier.page);
+ //just finalize = the layer right above the leaves has correct righthand pointers already
+ if (rightPage < 0) {
+ throw new HyracksDataException("Error in index creation. Internal node appears to have no rightmost guide");
+ }
+ ((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage);
+ int finalPageId = freePageManager.takePage(metaFrame);
+ frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalPageId));
+ write(frontier.page);
+ frontier.pageId = finalPageId;
+ persistFrontiers(level + 1, finalPageId);
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ try {
+ persistFrontiers(0, -1);
+ super.end();
+ } catch (HyracksDataException | RuntimeException e) {
+ handleException();
+ throw e;
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ super.handleException();
+ }
+
+ protected void writeFullLeafPage() throws HyracksDataException {
+ final NodeFrontier leafFrontier = nodeFrontiers.get(0);
+ leafFrontier.lastTuple.resetByTupleIndex(leafFrame, leafFrame.getTupleCount() - 1);
+ final int splitKeySize = tupleWriter.bytesRequired(leafFrontier.lastTuple, 0, cmp.getKeyFieldCount());
+ splitKey.initData(splitKeySize);
+ tupleWriter.writeTupleFields(leafFrontier.lastTuple, 0, cmp.getKeyFieldCount(), splitKey.getBuffer().array(),
+ 0);
+ splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
+ splitKey.setLeftPage(leafFrontier.pageId);
+
+ propagateBulk(1, pagesToWrite);
+
+ leafFrontier.pageId = freePageManager.takePage(metaFrame);
+
+ ((IBTreeLeafFrame) leafFrame).setNextLeaf(leafFrontier.pageId);
+
+ write(leafFrontier.page);
+ for (ICachedPage c : pagesToWrite) {
+ write(c);
+ }
+ pagesToWrite.clear();
+ splitKey.setRightPage(leafFrontier.pageId);
+ }
+
+ protected void writeLastLeaf(ICachedPage page) throws HyracksDataException {
+ write(page);
+ }
+
+ protected final void confiscateNewLeafPage() throws HyracksDataException {
+ final NodeFrontier leafFrontier = nodeFrontiers.get(0);
+ final long dpid = BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId);
+ leafFrontier.page = bufferCache.confiscatePage(dpid);
+ leafFrame.setPage(leafFrontier.page);
+ leafFrame.initBuffer((byte) 0);
+ }
+
+ private void logState(ITupleReference tuple, Exception e) {
+ try {
+ ObjectNode state = JSONUtil.createObject();
+ state.set("leafFrame", leafFrame.getState());
+ state.set("interiorFrame", interiorFrame.getState());
+ int tupleSize = Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple),
+ interiorFrame.getBytesRequiredToWriteTuple(tuple));
+ state.put("tupleSize", tupleSize);
+ state.put("spaceNeeded", tupleWriter.bytesRequired(tuple) + slotSize);
+ state.put("spaceUsed", leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace());
+ state.put("leafMaxBytes", leafMaxBytes);
+ state.put("maxTupleSize", maxTupleSize);
+ LOGGER.error("failed to add tuple {}", state, e);
+ } catch (Throwable t) {
+ e.addSuppressed(t);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 81e528b..11368bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -19,9 +19,6 @@
package org.apache.hyracks.storage.am.common.impls;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -30,17 +27,9 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
-import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.HaltOnFailureCallback;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
-import org.apache.hyracks.storage.common.buffercache.IFIFOPageWriter;
-import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
-import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
-import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public abstract class AbstractTreeIndex implements ITreeIndex {
@@ -229,118 +218,6 @@
return fieldCount;
}
- public abstract class AbstractTreeIndexBulkLoader extends PageWriteFailureCallback implements IIndexBulkLoader {
- protected final MultiComparator cmp;
- protected final int slotSize;
- protected final int leafMaxBytes;
- protected final int interiorMaxBytes;
- protected final ArrayList<NodeFrontier> nodeFrontiers = new ArrayList<>();
- protected final ITreeIndexMetadataFrame metaFrame;
- protected final ITreeIndexTupleWriter tupleWriter;
- protected ITreeIndexFrame leafFrame;
- protected ITreeIndexFrame interiorFrame;
- // Immutable bulk loaders write their root page at page -2, as needed e.g. by append-only file systems such as
- // HDFS. Since loading this tree relies on the root page actually being at that point, no further inserts into
- // that tree are allowed. Currently, this is not enforced.
- protected boolean releasedLatches;
- private final IFIFOPageWriter pageWriter;
- protected List<ICachedPage> pagesToWrite;
- private final ICompressedPageWriter compressedPageWriter;
-
- public AbstractTreeIndexBulkLoader(float fillFactor, IPageWriteCallback callback) throws HyracksDataException {
- leafFrame = leafFrameFactory.createFrame();
- interiorFrame = interiorFrameFactory.createFrame();
- metaFrame = freePageManager.createMetadataFrame();
-
- pageWriter = bufferCache.createFIFOWriter(callback, this);
-
- if (!isEmptyTree(leafFrame)) {
- throw HyracksDataException.create(ErrorCode.CANNOT_BULK_LOAD_NON_EMPTY_TREE);
- }
-
- this.cmp = MultiComparator.create(cmpFactories);
-
- leafFrame.setMultiComparator(cmp);
- interiorFrame.setMultiComparator(cmp);
-
- tupleWriter = leafFrame.getTupleWriter();
-
- NodeFrontier leafFrontier = new NodeFrontier(leafFrame.createTupleReference());
- leafFrontier.pageId = freePageManager.takePage(metaFrame);
- leafFrontier.page =
- bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId));
-
- interiorFrame.setPage(leafFrontier.page);
- interiorFrame.initBuffer((byte) 0);
- interiorMaxBytes = (int) (interiorFrame.getBuffer().capacity() * fillFactor);
-
- leafFrame.setPage(leafFrontier.page);
- leafFrame.initBuffer((byte) 0);
- leafMaxBytes = (int) (leafFrame.getBuffer().capacity() * fillFactor);
- slotSize = leafFrame.getSlotSize();
-
- nodeFrontiers.add(leafFrontier);
- pagesToWrite = new ArrayList<>();
- compressedPageWriter = bufferCache.getCompressedPageWriter(fileId);
- }
-
- protected void handleException() {
- // Unlatch and unpin pages that weren't in the queue to avoid leaking memory.
- compressedPageWriter.abort();
- for (NodeFrontier nodeFrontier : nodeFrontiers) {
- if (nodeFrontier != null && nodeFrontier.page != null) {
- ICachedPage frontierPage = nodeFrontier.page;
- if (frontierPage.confiscated()) {
- bufferCache.returnPage(frontierPage, false);
- }
- }
- }
- for (ICachedPage pageToDiscard : pagesToWrite) {
- if (pageToDiscard != null) {
- bufferCache.returnPage(pageToDiscard, false);
- }
- }
- releasedLatches = true;
- }
-
- @Override
- public void end() throws HyracksDataException {
- if (hasFailed()) {
- throw HyracksDataException.create(getFailure());
- }
- freePageManager.setRootPageId(rootPage);
- }
-
- protected void addLevel() throws HyracksDataException {
- NodeFrontier frontier = new NodeFrontier(tupleWriter.createTupleReference());
- frontier.page = bufferCache.confiscatePage(IBufferCache.INVALID_DPID);
- frontier.pageId = -1;
- frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
- interiorFrame.setPage(frontier.page);
- interiorFrame.initBuffer((byte) nodeFrontiers.size());
- nodeFrontiers.add(frontier);
- }
-
- public ITreeIndexFrame getLeafFrame() {
- return leafFrame;
- }
-
- public void setLeafFrame(ITreeIndexFrame leafFrame) {
- this.leafFrame = leafFrame;
- }
-
- public void write(ICachedPage cPage) throws HyracksDataException {
- compressedPageWriter.prepareWrite(cPage);
- pageWriter.write(cPage);
- }
-
- @Override
- public void force() throws HyracksDataException {
- bufferCache.force(fileId, false);
- }
-
- }
-
public IBinaryComparatorFactory[] getCmpFactories() {
return cmpFactories;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
new file mode 100644
index 0000000..45a88a7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.common.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IPageManager;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageWriter;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public abstract class AbstractTreeIndexBulkLoader extends PageWriteFailureCallback implements IIndexBulkLoader {
+ protected final IBufferCache bufferCache;
+ protected final IPageManager freePageManager;
+ protected final AbstractTreeIndex treeIndex;
+ protected final int fileId;
+ protected final MultiComparator cmp;
+ protected final int slotSize;
+ protected final int leafMaxBytes;
+ protected final int interiorMaxBytes;
+ protected final ArrayList<NodeFrontier> nodeFrontiers = new ArrayList<>();
+ protected final ITreeIndexMetadataFrame metaFrame;
+ protected final ITreeIndexTupleWriter tupleWriter;
+ protected ITreeIndexFrame leafFrame;
+ protected ITreeIndexFrame interiorFrame;
+ // Immutable bulk loaders write their root page at page -2, as needed e.g. by append-only file systems such as
+ // HDFS. Since loading this tree relies on the root page actually being at that point, no further inserts into
+ // that tree are allowed. Currently, this is not enforced.
+ protected boolean releasedLatches;
+ private final IFIFOPageWriter pageWriter;
+ protected List<ICachedPage> pagesToWrite;
+ private final ICompressedPageWriter compressedPageWriter;
+
+ protected AbstractTreeIndexBulkLoader(float fillFactor, IPageWriteCallback callback, ITreeIndex index)
+ throws HyracksDataException {
+ this(fillFactor, callback, index, index.getLeafFrameFactory().createFrame());
+ }
+
+ protected AbstractTreeIndexBulkLoader(float fillFactor, IPageWriteCallback callback, ITreeIndex index,
+ ITreeIndexFrame leafFrame) throws HyracksDataException {
+ this.bufferCache = index.getBufferCache();
+ this.freePageManager = index.getPageManager();
+ this.fileId = index.getFileId();
+ this.treeIndex = (AbstractTreeIndex) index;
+ this.leafFrame = leafFrame;
+ interiorFrame = treeIndex.getInteriorFrameFactory().createFrame();
+ metaFrame = freePageManager.createMetadataFrame();
+
+ pageWriter = bufferCache.createFIFOWriter(callback, this);
+
+ if (!treeIndex.isEmptyTree(leafFrame)) {
+ throw HyracksDataException.create(ErrorCode.CANNOT_BULK_LOAD_NON_EMPTY_TREE);
+ }
+
+ this.cmp = MultiComparator.create(treeIndex.getCmpFactories());
+
+ leafFrame.setMultiComparator(cmp);
+ interiorFrame.setMultiComparator(cmp);
+
+ tupleWriter = leafFrame.getTupleWriter();
+ NodeFrontier leafFrontier = new NodeFrontier(createTupleReference());
+ leafFrontier.pageId = freePageManager.takePage(metaFrame);
+ leafFrontier.page = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId));
+
+ interiorFrame.setPage(leafFrontier.page);
+ interiorFrame.initBuffer((byte) 0);
+ interiorMaxBytes = (int) (interiorFrame.getBuffer().capacity() * fillFactor);
+
+ leafFrame.setPage(leafFrontier.page);
+ leafFrame.initBuffer((byte) 0);
+ leafMaxBytes = (int) (leafFrame.getBuffer().capacity() * fillFactor);
+ slotSize = leafFrame.getSlotSize();
+
+ nodeFrontiers.add(leafFrontier);
+ pagesToWrite = new ArrayList<>();
+ compressedPageWriter = bufferCache.getCompressedPageWriter(fileId);
+ }
+
+ protected ITreeIndexTupleReference createTupleReference() {
+ return leafFrame.createTupleReference();
+ }
+
+ protected void handleException() {
+ // Unlatch and unpin pages that weren't in the queue to avoid leaking memory.
+ compressedPageWriter.abort();
+ for (NodeFrontier nodeFrontier : nodeFrontiers) {
+ if (nodeFrontier != null && nodeFrontier.page != null) {
+ ICachedPage frontierPage = nodeFrontier.page;
+ if (frontierPage.confiscated()) {
+ bufferCache.returnPage(frontierPage, false);
+ }
+ }
+ }
+ for (ICachedPage pageToDiscard : pagesToWrite) {
+ if (pageToDiscard != null) {
+ bufferCache.returnPage(pageToDiscard, false);
+ }
+ }
+ releasedLatches = true;
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ if (hasFailed()) {
+ throw HyracksDataException.create(getFailure());
+ }
+ freePageManager.setRootPageId(treeIndex.getRootPageId());
+ }
+
+ protected void setRootPageId(int rootPage) {
+ treeIndex.rootPage = rootPage;
+ }
+
+ protected void addLevel() throws HyracksDataException {
+ NodeFrontier frontier = new NodeFrontier(tupleWriter.createTupleReference());
+ frontier.page = bufferCache.confiscatePage(IBufferCache.INVALID_DPID);
+ frontier.pageId = -1;
+ frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
+ interiorFrame.setPage(frontier.page);
+ interiorFrame.initBuffer((byte) nodeFrontiers.size());
+ nodeFrontiers.add(frontier);
+ }
+
+ public ITreeIndexFrame getLeafFrame() {
+ return leafFrame;
+ }
+
+ public void setLeafFrame(ITreeIndexFrame leafFrame) {
+ this.leafFrame = leafFrame;
+ }
+
+ public void write(ICachedPage cPage) throws HyracksDataException {
+ compressedPageWriter.prepareWrite(cPage);
+ pageWriter.write(cPage);
+ }
+
+ @Override
+ public void force() throws HyracksDataException {
+ bufferCache.force(fileId, false);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 30813ef..4e024c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeBatchPointSearchCursor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
@@ -59,7 +60,7 @@
@Override
protected IIndexCursor createCursor() throws HyracksDataException {
ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
- return new LSMBTreeBatchPointSearchCursor(lsmAccessor.getOpContext());
+ return ((LSMBTree) index).createBatchPointSearchCursor(lsmAccessor.getOpContext());
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index c3d1416..d2fbbef 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -341,7 +341,7 @@
try {
List<ILSMComponent> mergedComponents = mergeOp.getMergingComponents();
long numElements = getNumberOfElements(mergedComponents);
- mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(), null,
+ mergedComponent = createDiskComponent(getMergeComponentFactory(), mergeOp.getTarget(), null,
mergeOp.getBloomFilterTarget(), true);
IPageWriteCallback pageWriteCallback = pageWriteCallbackFactory.createPageWriteCallback();
componentBulkLoader = mergedComponent.createBulkLoader(operation, 1.0f, false, numElements, false,
@@ -418,7 +418,7 @@
}
public ILSMIndexAccessor createAccessor(AbstractLSMIndexOperationContext opCtx) {
- return new LSMTreeIndexAccessor(getHarness(), opCtx, cursorFactory);
+ return new LSMTreeIndexAccessor(getHarness(), opCtx, getCursorFactory());
}
@Override
@@ -483,8 +483,28 @@
returnDeletedTuples = true;
}
IIndexCursorStats stats = new IndexCursorStats();
- LSMBTreeRangeSearchCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples, stats);
+ LSMBTreeRangeSearchCursor cursor = createCursor(opCtx, returnDeletedTuples, stats);
return new LSMBTreeMergeOperation(accessor, cursor, stats, mergeFileRefs.getInsertIndexFileReference(),
mergeFileRefs.getBloomFilterFileReference(), callback, getIndexIdentifier());
}
+
+ public LSMBTreeBatchPointSearchCursor createBatchPointSearchCursor(ILSMIndexOperationContext opCtx) {
+ return new LSMBTreeBatchPointSearchCursor(opCtx);
+ }
+
+ protected LSMBTreeRangeSearchCursor createCursor(AbstractLSMIndexOperationContext opCtx,
+ boolean returnDeletedTuples, IIndexCursorStats stats) {
+ return new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples, stats);
+ }
+
+ /**
+ * @return Merge component factory (could be different from {@link #componentFactory}
+ */
+ protected ILSMDiskComponentFactory getMergeComponentFactory() {
+ return componentFactory;
+ }
+
+ protected ICursorFactory getCursorFactory() {
+ return cursorFactory;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 1312e30..a00e10e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -43,7 +43,7 @@
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.util.trace.ITracer;
-public final class LSMBTreeOpContext extends AbstractLSMIndexOperationContext {
+public class LSMBTreeOpContext extends AbstractLSMIndexOperationContext {
/*
* Finals
@@ -74,9 +74,9 @@
IBinaryComparatorFactory[] filterCmpFactories, ITracer tracer) {
super(index, btreeFields, filterFields, filterCmpFactories, searchCallback, modificationCallback, tracer);
LSMBTreeMemoryComponent c = (LSMBTreeMemoryComponent) mutableComponents.get(0);
- IBinaryComparatorFactory cmpFactories[] = c.getIndex().getComparatorFactories();
+ IBinaryComparatorFactory[] cmpFactories = c.getIndex().getComparatorFactories();
if (cmpFactories[0] != null) {
- this.cmp = MultiComparator.create(c.getIndex().getComparatorFactories());
+ this.cmp = createMultiComparator(c.getIndex().getComparatorFactories());
} else {
this.cmp = null;
}
@@ -112,6 +112,10 @@
insertSearchCursor = new LSMBTreePointSearchCursor(this);
}
+ protected MultiComparator createMultiComparator(IBinaryComparatorFactory[] cmpFactories) {
+ return MultiComparator.create(cmpFactories);
+ }
+
@Override
public void setOperation(IndexOperation newOp) {
reset();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
index d4903d9..7bb96c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
@@ -199,7 +199,8 @@
for (int i = 0; i < numBTrees; i++) {
ILSMComponent component = operationalComponents.get(i);
BTree btree = (BTree) component.getIndex();
- if (component.getType() == LSMComponentType.MEMORY) {
+ LSMComponentType type = component.getType();
+ if (type == LSMComponentType.MEMORY) {
includeMutableComponent = true;
if (bloomFilters[i] != null) {
destroyAndNullifyCursorAtIndex(i);
@@ -212,8 +213,8 @@
}
if (btreeAccessors[i] == null) {
- btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- btreeCursors[i] = btreeAccessors[i].createPointCursor(false, false);
+ btreeAccessors[i] = createAccessor(type, btree, i);
+ btreeCursors[i] = createCursor(type, btreeAccessors[i]);
} else {
// re-use
btreeAccessors[i].reset(btree, NoOpIndexAccessParameters.INSTANCE);
@@ -225,6 +226,14 @@
hashComputed = false;
}
+ protected BTreeAccessor createAccessor(LSMComponentType type, BTree btree, int i) throws HyracksDataException {
+ return btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ }
+
+ protected ITreeIndexCursor createCursor(LSMComponentType type, BTreeAccessor btreeAccessor) {
+ return btreeAccessor.createPointCursor(false, type == LSMComponentType.DISK);
+ }
+
private void destroyAndNullifyCursorAtIndex(int i) throws HyracksDataException {
// component at location i was a disk component before, and is now a memory component, or vise versa
bloomFilters[i] = null;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 2c5fb50..968416c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -49,7 +49,7 @@
private final RangePredicate reusablePred;
private ISearchOperationCallback searchCallback;
private BTreeAccessor[] btreeAccessors;
- private boolean[] isMemoryComponent;
+ protected boolean[] isMemoryComponent;
private ArrayTupleBuilder tupleBuilder;
private boolean canCallProceed = true;
private boolean resultOfSearchCallbackProceed = false;
@@ -149,6 +149,7 @@
// There are no more elements in the memory component.. can safely skip locking for the
// remaining operations
includeMutableComponent = false;
+ excludeMemoryComponent();
}
}
} else {
@@ -180,6 +181,7 @@
// the tree of head tuple
// the head element of PQ is useless now
PriorityQueueElement e = outputPriorityQueue.poll();
+ markAsDeleted(e);
pushIntoQueueFromCursorAndReplaceThisElement(e);
} else {
// If the previous tuple and the head tuple are different
@@ -200,6 +202,14 @@
}
+ protected void excludeMemoryComponent() {
+ //NoOp
+ }
+
+ protected void markAsDeleted(PriorityQueueElement e) throws HyracksDataException {
+ //NoOp
+ }
+
private void pushOutputElementIntoQueueIfNeeded() throws HyracksDataException {
if (needPushElementIntoQueue) {
pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
@@ -250,7 +260,7 @@
}
}
- private int replaceFrom() throws HyracksDataException {
+ protected int replaceFrom() throws HyracksDataException {
int replaceFrom = -1;
if (!switchPossible) {
return replaceFrom;
@@ -386,20 +396,21 @@
}
for (int i = 0; i < numBTrees; i++) {
ILSMComponent component = operationalComponents.get(i);
+ LSMComponentType type = component.getType();
BTree btree;
if (component.getType() == LSMComponentType.MEMORY) {
includeMutableComponent = true;
}
btree = (BTree) component.getIndex();
if (btreeAccessors[i] == null || destroyIncompatible(component, i)) {
- btreeAccessors[i] = btree.createAccessor(iap);
- rangeCursors[i] = btreeAccessors[i].createSearchCursor(false);
+ btreeAccessors[i] = createAccessor(type, btree, i);
+ rangeCursors[i] = createCursor(type, btreeAccessors[i]);
} else {
// re-use
btreeAccessors[i].reset(btree, iap);
rangeCursors[i].close();
}
- isMemoryComponent[i] = component.getType() == LSMComponentType.MEMORY;
+ isMemoryComponent[i] = type == LSMComponentType.MEMORY;
}
IndexCursorUtils.open(btreeAccessors, rangeCursors, searchPred);
try {
@@ -433,4 +444,12 @@
return resultOfSearchCallbackProceed;
}
+ protected BTreeAccessor createAccessor(LSMComponentType type, BTree btree, int index) throws HyracksDataException {
+ return btree.createAccessor(iap);
+ }
+
+ protected IIndexCursor createCursor(LSMComponentType type, BTreeAccessor accessor) {
+ return accessor.createSearchCursor(false);
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
index efacad1..aa72267 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
@@ -46,6 +46,13 @@
scanCursor = new LSMBTreeDiskComponentScanCursor(opCtx);
}
+ protected LSMBTreeSearchCursor(LSMBTreePointSearchCursor pointCursor, LSMBTreeRangeSearchCursor rangeCursor,
+ LSMBTreeDiskComponentScanCursor scanCursor) {
+ this.pointCursor = pointCursor;
+ this.rangeCursor = rangeCursor;
+ this.scanCursor = scanCursor;
+ }
+
@Override
public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
index 13a0e27..acb84e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
@@ -20,7 +20,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex.AbstractTreeIndexBulkLoader;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndexBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleWriter;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
index b7eb115..11385de 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -50,7 +51,7 @@
protected boolean open;
protected RTreeSearchCursor[] rtreeCursors;
- protected BTreeRangeSearchCursor[] btreeCursors;
+ protected ITreeIndexCursor[] btreeCursors;
protected RTreeAccessor[] rtreeAccessors;
protected BTreeAccessor[] btreeAccessors;
protected BloomFilter[] bloomFilters;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 8e5cb35..729ca74 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -142,7 +142,8 @@
bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBuddyIndex().getFileId(), linearizerArray,
btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(),
flushingComponent.getBuddyIndex().getBufferCache(), comparatorFields);
- BTreeRangeSearchCursor btreeScanCursor = memBTreeAccessor.createSearchCursor(false);
+ BTreeRangeSearchCursor btreeScanCursor =
+ (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false);
try {
isEmpty = true;
memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index 7e8f249..d85200f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.storage.am.common.frames.AbstractSlotManager;
import org.apache.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndexBulkLoader;
import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
import org.apache.hyracks.storage.am.common.impls.TreeIndexDiskOrderScanCursor;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -902,7 +903,7 @@
return new RTreeBulkLoader(fillFactor, callback);
}
- public class RTreeBulkLoader extends AbstractTreeIndex.AbstractTreeIndexBulkLoader {
+ public class RTreeBulkLoader extends AbstractTreeIndexBulkLoader {
ITreeIndexFrame lowerFrame, prevInteriorFrame;
RTreeTypeAwareTupleWriter interiorFrameTupleWriter =
((RTreeTypeAwareTupleWriter) interiorFrame.getTupleWriter());
@@ -911,7 +912,7 @@
List<Integer> prevNodeFrontierPages = new ArrayList<>();
public RTreeBulkLoader(float fillFactor, IPageWriteCallback callback) throws HyracksDataException {
- super(fillFactor, callback);
+ super(fillFactor, callback, RTree.this);
prevInteriorFrame = interiorFrameFactory.createFrame();
}