[ASTERIXDB-2708] Introduce batch and stateful point cursors
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add a stateful btree point search cursor that uses previous search history
and exponential search algorithm to optimize point search performance
- Add a batching LSM btree point search cursor to perform point searches for
a batch of keys. Search states are cleared after each batch.
Change-Id: I0b0ade723895bcd71463df7a9703fe78a238e6c7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5484
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Luo Chen <cluo8@uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index 94d3461..8c2c1df 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -50,6 +50,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -138,7 +139,8 @@
builder.getJobSpec(), opSchema, typeEnv, context, jobGenParams.getRetainInput(), retainMissing, dataset,
jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes, jobGenParams.isLowKeyInclusive(),
jobGenParams.isHighKeyInclusive(), propagateFilter, minFilterFieldIndexes, maxFilterFieldIndexes,
- tupleFilterFactory, outputLimit, unnestMap.getGenerateCallBackProceedResultVar());
+ tupleFilterFactory, outputLimit, unnestMap.getGenerateCallBackProceedResultVar(),
+ isPrimaryIndexPointSearch(op));
IOperatorDescriptor opDesc = btreeSearch.first;
opDesc.setSourceLocation(unnestMap.getSourceLocation());
@@ -149,6 +151,32 @@
builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
}
+ private boolean isPrimaryIndexPointSearch(ILogicalOperator op) {
+ if (!isEqCondition || !isPrimaryIndex || !lowKeyVarList.equals(highKeyVarList)) {
+ return false;
+ }
+ Index searchIndex = ((DataSourceIndex) idx).getIndex();
+ int numberOfKeyFields = searchIndex.getKeyFieldNames().size();
+
+ if (lowKeyVarList.size() != numberOfKeyFields || highKeyVarList.size() != numberOfKeyFields) {
+ return false;
+ }
+
+ IPhysicalPropertiesVector vector = op.getInputs().get(0).getValue().getDeliveredPhysicalProperties();
+ if (vector != null) {
+ for (ILocalStructuralProperty property : vector.getLocalProperties()) {
+ if (property.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
+ LocalOrderProperty orderProperty = (LocalOrderProperty) property;
+ if (orderProperty.getColumns().equals(lowKeyVarList)
+ && orderProperty.getOrders().stream().allMatch(o -> o.equals(OrderKind.ASC))) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index f3a88ab..f5ca79a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -127,7 +127,7 @@
return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false,
((DatasetDataSource) dataSource).getDataset(), primaryIndex.getIndexName(), null, null, true,
true, false, minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory, outputLimit,
- false);
+ false, false);
default:
throw new AlgebricksException("Unknown datasource type");
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 1ee59bf..ff68e78 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -148,6 +148,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeBatchPointSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
@@ -499,7 +500,7 @@
boolean retainMissing, Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, boolean propagateFilter, int[] minFilterFieldIndexes,
int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory, long outputLimit,
- boolean isIndexOnlyPlan) throws AlgebricksException {
+ boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch) throws AlgebricksException {
boolean isSecondary = true;
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
@@ -540,11 +541,16 @@
BTreeSearchOperatorDescriptor btreeSearchOp;
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
- btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields,
- lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
- context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
- maxFilterFieldIndexes, propagateFilter, tupleFilterFactory, outputLimit, proceedIndexOnlyPlan,
- failValueForIndexOnlyPlan, successValueForIndexOnlyPlan);
+ btreeSearchOp = !isSecondary && isPrimaryIndexPointSearch
+ ? new LSMBTreeBatchPointSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
+ highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput,
+ retainMissing, context.getMissingWriterFactory(), searchCallbackFactory,
+ minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory, outputLimit)
+ : new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
+ context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes, propagateFilter, tupleFilterFactory, outputLimit,
+ proceedIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan);
} else {
btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index 062b1c7..6c0badc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -124,10 +124,14 @@
}
public boolean contains(ITupleReference tuple, long[] hashes) throws HyracksDataException {
+ computeHashes(tuple, hashes);
+ return contains(hashes);
+ }
+
+ public boolean contains(long[] hashes) throws HyracksDataException {
if (numPages == 0) {
return false;
}
- MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
if (version == BLOCKED_BLOOM_FILTER_VERSION) {
return blockContains(hashes);
} else {
@@ -135,6 +139,10 @@
}
}
+ public void computeHashes(ITupleReference tuple, long[] hashes) {
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
+ }
+
private boolean blockContains(long[] hashes) throws HyracksDataException {
// take first hash to compute block id
long hash = Math.abs(hashes[0] % numBits);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/api/IBTreeLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/api/IBTreeLeafFrame.java
index 2a3c1eb..7ccf08f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/api/IBTreeLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/api/IBTreeLeafFrame.java
@@ -32,6 +32,9 @@
public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference pageTuple, MultiComparator cmp,
FindTupleMode ftm, FindTupleNoExactMatchPolicy ftp) throws HyracksDataException;
+ public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference pageTuple, MultiComparator cmp,
+ int startIndex) throws HyracksDataException;
+
public int findUpdateTupleIndex(ITupleReference tuple) throws HyracksDataException;
public int findUpsertTupleIndex(ITupleReference tuple) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 001e250..128929c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -38,8 +38,8 @@
protected final int[] highKeyFields;
protected final boolean lowKeyInclusive;
protected final boolean highKeyInclusive;
- private final int[] minFilterFieldIndexes;
- private final int[] maxFilterFieldIndexes;
+ protected final int[] minFilterFieldIndexes;
+ protected final int[] maxFilterFieldIndexes;
protected final IIndexDataflowHelperFactory indexHelperFactory;
protected final boolean retainInput;
protected final boolean retainMissing;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
index eb87c57..6584df3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeFieldPrefixNSMLeafFrame.java
@@ -769,6 +769,12 @@
}
@Override
+ public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference pageTuple, MultiComparator cmp,
+ int startIndex) throws HyracksDataException {
+ throw new UnsupportedOperationException("Stateful search is not supported by BTreeFieldPrefixNSMLeafFrame");
+ }
+
+ @Override
public int getPageHeaderSize() {
return NEXT_LEAF_OFFSET;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
index 09a4db4..b1add3c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/BTreeNSMLeafFrame.java
@@ -313,6 +313,12 @@
}
@Override
+ public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference pageTuple, MultiComparator cmp,
+ int startIndex) throws HyracksDataException {
+ return slotManager.findTupleIndex(searchKey, pageTuple, cmp, startIndex);
+ }
+
+ @Override
public void setMultiComparator(MultiComparator cmp) {
this.cmp = cmp;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/OrderedSlotManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/OrderedSlotManager.java
index bb06ec0..3931ebc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/OrderedSlotManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/frames/OrderedSlotManager.java
@@ -104,6 +104,71 @@
}
@Override
+ public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
+ int startIndex) throws HyracksDataException {
+ int tupleCount = frame.getTupleCount();
+ if (tupleCount == 0) {
+ return -1;
+ } else if (startIndex >= tupleCount) {
+ return -tupleCount - 1;
+ }
+
+ int step = 1;
+ int index = startIndex;
+ int prevIndex = index;
+
+ // now we have key index < tupleCount - 1
+ // use exponential search to locate the key range that contains the search key
+ // https://en.wikipedia.org/wiki/Exponential_search
+ while (index < tupleCount) {
+ frameTuple.resetByTupleIndex(frame, index);
+ int cmp = multiCmp.compare(searchKey, frameTuple);
+ if (cmp == 0) {
+ return index;
+ } else if (cmp > 0) {
+ prevIndex = index;
+ if (index + step < tupleCount) {
+ index = index + step;
+ step = step << 1;
+ } else {
+ if (index == tupleCount - 1) {
+ // we've already reached the last tuple
+ return -tupleCount - 1;
+ } else {
+ index = tupleCount - 1;
+ }
+ }
+ } else {
+ break;
+ }
+ }
+
+ if (index == startIndex) {
+ return -index - 1;
+ }
+
+ // perform binary search between prevPosition and position
+ // we must have prevIndex < keyIndex < index
+ // adopted from Collections.binarySearch
+ int low = prevIndex + 1;
+ int high = index - 1;
+
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+ frameTuple.resetByTupleIndex(frame, mid);
+ int cmp = multiCmp.compare(searchKey, frameTuple);
+ if (cmp < 0) {
+ high = mid - 1;
+ } else if (cmp > 0) {
+ low = mid + 1;
+ } else {
+ return mid;
+ }
+ }
+ return -low - 1;
+ }
+
+ @Override
public int insertSlot(int tupleIndex, int tupleOff) {
int slotOff = getSlotOff(tupleIndex);
if (tupleIndex == GREATEST_KEY_INDICATOR) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 3a062af..1f0e447 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -887,7 +887,7 @@
.getOrDefault(HyracksConstants.INDEX_CURSOR_STATS, NoOpIndexCursorStats.INSTANCE));
}
- public BTreeRangeSearchCursor createPointCursor(boolean exclusive) {
+ public BTreeRangeSearchCursor createPointCursor(boolean exclusive, boolean stateful) {
return createSearchCursor(exclusive);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BatchPredicate.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BatchPredicate.java
new file mode 100644
index 0000000..acfde93
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BatchPredicate.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.btree.impls;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class BatchPredicate extends RangePredicate {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final FrameTupleReference keyTuple;
+ protected final FrameTupleReference minFilterTuple;
+ protected final FrameTupleReference maxFilterTuple;
+
+ protected int keyIndex = -1;
+ protected IFrameTupleAccessor accessor;
+
+ public BatchPredicate(IFrameTupleAccessor accessor, MultiComparator keyCmp, int[] keyFields,
+ int[] minFilterKeyFields, int[] maxFieldKeyFields) {
+ super(null, null, true, true, keyCmp, keyCmp);
+ this.keyIndex = 0;
+ this.accessor = accessor;
+ if (keyFields != null && keyFields.length > 0) {
+ this.keyTuple = new PermutingFrameTupleReference(keyFields);
+ } else {
+ this.keyTuple = new FrameTupleReference();
+ }
+ if (minFilterKeyFields != null && minFilterKeyFields.length > 0) {
+ this.minFilterTuple = new PermutingFrameTupleReference(minFilterKeyFields);
+ } else {
+ this.minFilterTuple = null;
+ }
+ if (maxFieldKeyFields != null && maxFieldKeyFields.length > 0) {
+ this.maxFilterTuple = new PermutingFrameTupleReference(maxFieldKeyFields);
+ } else {
+ this.maxFilterTuple = null;
+ }
+
+ }
+
+ public void reset(IFrameTupleAccessor accessor) {
+ this.keyIndex = -1;
+ this.accessor = accessor;
+ }
+
+ private boolean isValid() {
+ return accessor != null && keyIndex >= 0 && keyIndex < accessor.getTupleCount();
+ }
+
+ @Override
+ public ITupleReference getLowKey() {
+ return isValid() ? keyTuple : null;
+ }
+
+ @Override
+ public ITupleReference getHighKey() {
+ return isValid() ? keyTuple : null;
+ }
+
+ @Override
+ public ITupleReference getMinFilterTuple() {
+ return isValid() ? minFilterTuple : null;
+ }
+
+ @Override
+ public ITupleReference getMaxFilterTuple() {
+ return isValid() ? maxFilterTuple : null;
+ }
+
+ @Override
+ public boolean isPointPredicate(MultiComparator originalKeyComparator) throws HyracksDataException {
+ return true;
+ }
+
+ public boolean hasNext() {
+ return accessor != null && keyIndex < accessor.getTupleCount() - 1;
+ }
+
+ public void next() {
+ keyIndex++;
+ if (isValid()) {
+ keyTuple.reset(accessor, keyIndex);
+ if (minFilterTuple != null) {
+ minFilterTuple.reset(accessor, keyIndex);
+ }
+ if (maxFilterTuple != null) {
+ maxFilterTuple.reset(accessor, keyIndex);
+ }
+ }
+ }
+
+ public int getKeyIndex() {
+ return keyIndex;
+ }
+
+ public int getNumKeys() {
+ return accessor.getTupleCount();
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
index 6459471..ae6bbaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -77,7 +78,6 @@
ctx.reset();
ctx.setPred((RangePredicate) searchPred);
ctx.setCursor(cursor);
- // simple index scan
if (ctx.getPred().getLowKeyComparator() == null) {
ctx.getPred().setLowKeyComparator(ctx.getCmp());
}
@@ -87,87 +87,52 @@
cursor.setBufferCache(bufferCache);
cursor.setFileId(getFileId());
- DiskBTreeRangeSearchCursor diskCursor = (DiskBTreeRangeSearchCursor) cursor;
-
- if (diskCursor.numSearchPages() == 0) {
- // we have to search from root to leaf
- ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), rootPage), false);
- diskCursor.addSearchPage(rootPage);
- searchDown(rootNode, rootPage, ctx, diskCursor);
- } else {
- // we first check whether the leaf page matches because page may be shifted during cursor.hasNext
- if (ctx.getLeafFrame().getPage() != diskCursor.getPage()) {
- ctx.getLeafFrame().setPage(diskCursor.getPage());
- ctx.getCursorInitialState().setPage(diskCursor.getPage());
- ctx.getCursorInitialState().setPageId(diskCursor.getPageId());
- }
-
- if (fitInPage(ctx.getPred().getLowKey(), ctx.getPred().getLowKeyComparator(), ctx.getLeafFrame())) {
- // the input still falls into the previous search leaf
- diskCursor.open(ctx.getCursorInitialState(), searchPred);
- } else {
- // unpin the previous leaf page
- bufferCache.unpin(ctx.getLeafFrame().getPage());
- diskCursor.removeLastSearchPage();
-
- ICachedPage page = searchUp(ctx, diskCursor);
- int pageId = diskCursor.getLastSearchPage();
-
- searchDown(page, pageId, ctx, diskCursor);
+ if (cursor instanceof DiskBTreePointSearchCursor) {
+ DiskBTreePointSearchCursor pointCursor = (DiskBTreePointSearchCursor) cursor;
+ int lastPageId = pointCursor.getLastPageId();
+ if (lastPageId != BufferCache.INVALID_PAGEID) {
+ // check whether the last leaf page contains this key
+ ICachedPage lastPage =
+ bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), lastPageId), false);
+ ctx.getLeafFrame().setPage(lastPage);
+ if (fitInPage(ctx.getPred().getLowKey(), ctx.getPred().getLowKeyComparator(), ctx.getLeafFrame())) {
+ // use this page
+ ctx.getCursorInitialState().setPage(lastPage);
+ ctx.getCursorInitialState().setPageId(lastPageId);
+ pointCursor.open(ctx.getCursorInitialState(), searchPred);
+ return;
+ } else {
+ // release the last page and clear the states of this cursor
+ // then retry the search from root to leaf
+ bufferCache.unpin(lastPage);
+ pointCursor.clearSearchState();
+ }
}
}
- }
-
- private ICachedPage searchUp(BTreeOpContext ctx, DiskBTreeRangeSearchCursor cursor) throws HyracksDataException {
- int index = cursor.numSearchPages() - 1;
- // no need to check root page
- for (; index >= 0; index--) {
- int pageId = cursor.getLastSearchPage();
- ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), pageId), false);
- ctx.getInteriorFrame().setPage(page);
- if (index == 0 || fitInPage(ctx.getPred().getLowKey(), ctx.getPred().getLowKeyComparator(),
- ctx.getInteriorFrame())) {
- // we've found the right page
- return page;
- } else {
- // unpin the current page
- bufferCache.unpin(page);
- cursor.removeLastSearchPage();
- }
- }
-
- // if no page is available (which is the case for single-level BTree)
- // we simply return the root page
- ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), rootPage), false);
- cursor.addSearchPage(rootPage);
- return page;
+ ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), rootPage), false);
+ searchDown(rootNode, rootPage, ctx, cursor);
}
private boolean fitInPage(ITupleReference key, MultiComparator comparator, IBTreeFrame frame)
throws HyracksDataException {
+ // assume that search keys are sorted (non-decreasing)
ITupleReference rightmostTuple = frame.getRightmostTuple();
int cmp = comparator.compare(key, rightmostTuple);
- if (cmp > 0) {
- return false;
- }
- ITupleReference leftmostTuple = frame.getLeftmostTuple();
- return comparator.compare(key, leftmostTuple) >= 0;
+ return cmp <= 0;
}
- private void searchDown(ICachedPage page, int pageId, BTreeOpContext ctx, DiskBTreeRangeSearchCursor cursor)
+ private void searchDown(ICachedPage page, int pageId, BTreeOpContext ctx, ITreeIndexCursor cursor)
throws HyracksDataException {
ICachedPage currentPage = page;
ctx.getInteriorFrame().setPage(currentPage);
-
try {
int childPageId = pageId;
while (!ctx.getInteriorFrame().isLeaf()) {
// walk down the tree until we find the leaf
childPageId = ctx.getInteriorFrame().getChildPageId(ctx.getPred());
-
- // save the child page tuple index
- cursor.addSearchPage(childPageId);
bufferCache.unpin(currentPage);
+ pageId = childPageId;
+
currentPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(getFileId(), childPageId), false);
ctx.getInteriorFrame().setPage(currentPage);
}
@@ -233,9 +198,9 @@
}
@Override
- public BTreeRangeSearchCursor createPointCursor(boolean exclusive) {
+ public BTreeRangeSearchCursor createPointCursor(boolean exclusive, boolean stateful) {
IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) btree.getLeafFrameFactory().createFrame();
- return new DiskBTreePointSearchCursor(leafFrame, exclusive);
+ return new DiskBTreePointSearchCursor(leafFrame, exclusive, stateful);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreePointSearchCursor.java
index 7814e60..1bf3ecf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreePointSearchCursor.java
@@ -25,13 +25,27 @@
import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
public class DiskBTreePointSearchCursor extends DiskBTreeRangeSearchCursor {
+ /**
+ * A stateful cursor keeps the search state (last search page Id + index) across multiple searches
+ * until {@link #clearSearchState()} is called explicity
+ */
+ private final boolean stateful;
private boolean nextHasBeenCalled;
- public DiskBTreePointSearchCursor(IBTreeLeafFrame frame, boolean exclusiveLatchNodes) {
+ private int lastPageId = BufferCache.INVALID_PAGEID;
+ private int lastTupleIndex = 0;
+
+ public DiskBTreePointSearchCursor(IBTreeLeafFrame frame, boolean exclusiveLatchNodes, boolean stateful) {
super(frame, exclusiveLatchNodes);
+ this.stateful = stateful;
+ }
+
+ public DiskBTreePointSearchCursor(IBTreeLeafFrame frame, boolean exclusiveLatchNodes) {
+ this(frame, exclusiveLatchNodes, false);
}
@Override
@@ -71,6 +85,32 @@
// only get the low key position
tupleIndex = getLowKeyIndex();
+ if (stateful) {
+ lastPageId = pageId;
+ if (tupleIndex >= 0) {
+ lastTupleIndex = tupleIndex;
+ } else {
+ lastTupleIndex = -tupleIndex - 1;
+ }
+ }
+ }
+
+ public int getLastPageId() {
+ return lastPageId;
+ }
+
+ @Override
+ protected int getLowKeyIndex() throws HyracksDataException {
+ if (stateful) {
+ return frame.findTupleIndex(lowKey, frameTuple, lowKeyCmp, lastTupleIndex);
+ } else {
+ return super.getLowKeyIndex();
+ }
+ }
+
+ public void clearSearchState() {
+ this.lastPageId = BufferCache.INVALID_PAGEID;
+ this.lastTupleIndex = 0;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java
index d26378b..d788398 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java
@@ -19,25 +19,20 @@
package org.apache.hyracks.storage.am.btree.impls;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import org.apache.hyracks.storage.common.IIndexCursorStats;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public class DiskBTreeRangeSearchCursor extends BTreeRangeSearchCursor {
- // keep track of the pages (root -> leaf) we've searched
- protected final List<Integer> searchPages = new ArrayList<>(5);
-
public DiskBTreeRangeSearchCursor(IBTreeLeafFrame frame, boolean exclusiveLatchNodes) {
- super(frame, exclusiveLatchNodes);
+ this(frame, exclusiveLatchNodes, NoOpIndexCursorStats.INSTANCE);
}
- public DiskBTreeRangeSearchCursor(IBTreeLeafFrame frame, boolean exclusiveLatchNodes, IIndexCursorStats stats) {
+ protected DiskBTreeRangeSearchCursor(IBTreeLeafFrame frame, boolean exclusiveLatchNodes, IIndexCursorStats stats) {
super(frame, exclusiveLatchNodes, stats);
}
@@ -50,7 +45,6 @@
fetchNextLeafPage(nextLeafPage);
tupleIndex = 0;
// update page ids and positions
- searchPages.set(searchPages.size() - 1, nextLeafPage);
stopTupleIndex = getHighKeyIndex();
if (stopTupleIndex < 0) {
return false;
@@ -67,28 +61,6 @@
return true;
}
- @Override
- protected void resetBeforeOpen() throws HyracksDataException {
- // do nothing
- // we allow a disk btree range cursor be stateful, that is, the next search can be based on the previous search
- }
-
- public int numSearchPages() {
- return searchPages.size();
- }
-
- public void addSearchPage(int page) {
- searchPages.add(page);
- }
-
- public int getLastSearchPage() {
- return searchPages.get(searchPages.size() - 1);
- }
-
- public int removeLastSearchPage() {
- return searchPages.remove(searchPages.size() - 1);
- }
-
public ICachedPage getPage() {
return page;
}
@@ -104,9 +76,4 @@
return bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
}
- @Override
- public void doClose() throws HyracksDataException {
- super.doClose();
- searchPages.clear();
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/FieldPrefixSlotManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/FieldPrefixSlotManager.java
index cd0a2d3..86ebe03 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/FieldPrefixSlotManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/FieldPrefixSlotManager.java
@@ -44,19 +44,23 @@
private BTreeFieldPrefixNSMLeafFrame frame;
private MultiComparator cmp;
+ @Override
public int decodeFirstSlotField(int slot) {
return (slot & 0xFF000000) >>> 24;
}
+ @Override
public int decodeSecondSlotField(int slot) {
return slot & 0x00FFFFFF;
}
+ @Override
public int encodeSlotFields(int firstField, int secondField) {
return ((firstField & 0x000000FF) << 24) | (secondField & 0x00FFFFFF);
}
// returns prefix slot number, or TUPLE_UNCOMPRESSED of no match was found
+ @Override
public int findPrefix(ITupleReference tuple, ITreeIndexTupleReference framePrefixTuple)
throws HyracksDataException {
int prefixMid;
@@ -194,30 +198,37 @@
}
}
+ @Override
public int getPrefixSlotStartOff() {
return buf.capacity() - slotSize;
}
+ @Override
public int getPrefixSlotEndOff() {
return buf.capacity() - slotSize * frame.getPrefixTupleCount();
}
+ @Override
public int getTupleSlotStartOff() {
return getPrefixSlotEndOff() - slotSize;
}
+ @Override
public int getTupleSlotEndOff() {
return buf.capacity() - slotSize * (frame.getPrefixTupleCount() + frame.getTupleCount());
}
+ @Override
public int getSlotSize() {
return slotSize;
}
+ @Override
public void setSlot(int offset, int value) {
frame.getBuffer().putInt(offset, value);
}
+ @Override
public int insertSlot(int slot, int tupleOff) {
int slotNum = decodeSecondSlotField(slot);
if (slotNum == ERROR_INDICATOR) {
@@ -241,14 +252,17 @@
}
}
+ @Override
public int getPrefixSlotOff(int tupleIndex) {
return getPrefixSlotStartOff() - tupleIndex * slotSize;
}
+ @Override
public int getTupleSlotOff(int tupleIndex) {
return getTupleSlotStartOff() - tupleIndex * slotSize;
}
+ @Override
public void setPrefixSlot(int tupleIndex, int slot) {
buf.putInt(getPrefixSlotOff(tupleIndex), slot);
}
@@ -276,6 +290,12 @@
}
@Override
+ public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
+ int startIndex) throws HyracksDataException {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
public int getSlotStartOff() {
throw new UnsupportedOperationException("Not implemented.");
}
@@ -295,7 +315,9 @@
throw new UnsupportedOperationException("Not implemented.");
}
+ @Override
public void setMultiComparator(MultiComparator cmp) {
this.cmp = cmp;
}
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISlotManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISlotManager.java
index cd58387..3da722d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISlotManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISlotManager.java
@@ -29,6 +29,9 @@
public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
FindTupleMode mode, FindTupleNoExactMatchPolicy matchPolicy) throws HyracksDataException;
+ public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
+ int startIndex) throws HyracksDataException;
+
public int getGreatestKeyIndicator();
public int getErrorIndicator();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 142e879..fae0d75 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -92,7 +92,7 @@
protected ArrayTupleBuilder nonFilterTupleBuild;
protected final ISearchOperationCallbackFactory searchCallbackFactory;
protected boolean failed = false;
- private IOperatorStats stats;
+ protected IOperatorStats stats;
// Used when the result of the search operation callback needs to be passed.
protected boolean appendSearchCallbackProceedResult;
@@ -341,7 +341,7 @@
}
}
- private void writeTupleToOutput(ITupleReference tuple) throws IOException {
+ protected void writeTupleToOutput(ITupleReference tuple) throws IOException {
try {
for (int i = 0; i < tuple.getFieldCount(); i++) {
dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
@@ -393,7 +393,7 @@
* is used by ITupleFilter
*
*/
- private static class ReferenceFrameTupleReference implements IFrameTupleReference {
+ protected static class ReferenceFrameTupleReference implements IFrameTupleReference {
private ITupleReference tuple;
public IFrameTupleReference reset(ITupleReference tuple) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
new file mode 100644
index 0000000..23be280
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.btree.dataflow;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+
+public class LSMBTreeBatchPointSearchOperatorDescriptor extends BTreeSearchOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public LSMBTreeBatchPointSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
+ IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
+ IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
+ int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory,
+ long outputLimit) {
+ super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
+ retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes, false, tupleFilterFactory, outputLimit, false, null, null);
+ }
+
+ @Override
+ public LSMBTreeBatchPointSearchOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new LSMBTreeBatchPointSearchOperatorNodePushable(ctx, partition,
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
+ retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, tupleFilterFactory,
+ outputLimit);
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
new file mode 100644
index 0000000..596f4b0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.btree.impls.BatchPredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeBatchPointSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+
+public class LSMBTreeBatchPointSearchOperatorNodePushable extends BTreeSearchOperatorNodePushable {
+
+ private final int[] keyFields;
+
+ public LSMBTreeBatchPointSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition,
+ RecordDescriptor inputRecDesc, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
+ boolean highKeyInclusive, int[] minFilterKeyFields, int[] maxFilterKeyFields,
+ IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
+ IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit) throws HyracksDataException {
+ super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
+ minFilterKeyFields, maxFilterKeyFields, indexHelperFactory, retainInput, retainMissing,
+ missingWriterFactory, searchCallbackFactory, false, tupleFilterFactory, outputLimit, false, null, null);
+ this.keyFields = lowKeyFields;
+ }
+
+ @Override
+ protected IIndexCursor createCursor() throws HyracksDataException {
+ ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
+ return new LSMBTreeBatchPointSearchCursor(lsmAccessor.getOpContext());
+ }
+
+ @Override
+ protected ISearchPredicate createSearchPredicate() {
+ ITreeIndex treeIndex = (ITreeIndex) index;
+ lowKeySearchCmp =
+ highKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), lowKey);
+ return new BatchPredicate(accessor, lowKeySearchCmp, keyFields, minFilterFieldIndexes, maxFilterFieldIndexes);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ if (accessor.getTupleCount() > 0) {
+ BatchPredicate batchPred = (BatchPredicate) searchPred;
+ batchPred.reset(accessor);
+ try {
+ indexAccessor.search(cursor, batchPred);
+ writeSearchResults();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ } finally {
+ cursor.close();
+ }
+ }
+ }
+
+ protected void writeSearchResults() throws IOException {
+ long matchingTupleCount = 0;
+ LSMBTreeBatchPointSearchCursor batchCursor = (LSMBTreeBatchPointSearchCursor) cursor;
+ int tupleIndex = 0;
+ while (cursor.hasNext()) {
+ cursor.next();
+ matchingTupleCount++;
+ ITupleReference tuple = cursor.getTuple();
+ if (tupleFilter != null && !tupleFilter.accept(referenceFilterTuple.reset(tuple))) {
+ continue;
+ }
+ tb.reset();
+
+ if (retainInput && retainMissing) {
+ appendMissingTuple(tupleIndex, batchCursor.getKeyIndex());
+ }
+
+ tupleIndex = batchCursor.getKeyIndex();
+
+ if (retainInput) {
+ frameTuple.reset(accessor, tupleIndex);
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
+ }
+ writeTupleToOutput(tuple);
+ FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ if (outputLimit >= 0 && ++outputCount >= outputLimit) {
+ finished = true;
+ break;
+ }
+ }
+ stats.getTupleCounter().update(matchingTupleCount);
+
+ }
+
+ private void appendMissingTuple(int start, int end) throws HyracksDataException {
+ for (int i = start; i < end; i++) {
+ FrameUtils.appendConcatToWriter(writer, appender, accessor, i, nonMatchTupleBuild.getFieldEndOffsets(),
+ nonMatchTupleBuild.getByteArray(), 0, nonMatchTupleBuild.getSize());
+ }
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBatchPointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBatchPointSearchCursor.java
new file mode 100644
index 0000000..8ab6fb1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBatchPointSearchCursor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.btree.impls.BatchPredicate;
+import org.apache.hyracks.storage.am.btree.impls.DiskBTreePointSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+
+/**
+ * This cursor performs point searches for each batch of search keys.
+ * Assumption: the search keys must be sorted into the increasing order.
+ *
+ */
+public class LSMBTreeBatchPointSearchCursor extends LSMBTreePointSearchCursor {
+
+ public LSMBTreeBatchPointSearchCursor(ILSMIndexOperationContext opCtx) {
+ super(opCtx);
+ }
+
+ @Override
+ public boolean doHasNext() throws HyracksDataException {
+ BatchPredicate batchPred = (BatchPredicate) predicate;
+ while (!foundTuple && batchPred.hasNext()) {
+ batchPred.next();
+ if (foundIn >= 0) {
+ btreeCursors[foundIn].close();
+ foundIn = -1;
+ }
+ foundTuple = super.doHasNext();
+ }
+ return foundTuple;
+ }
+
+ @Override
+ public void doNext() throws HyracksDataException {
+ foundTuple = false;
+ }
+
+ @Override
+ protected boolean isSearchCandidate(int componentIndex) throws HyracksDataException {
+ if (!super.isSearchCandidate(componentIndex)) {
+ return false;
+ }
+ // check filters
+ ITupleReference minFilterKey = predicate.getMinFilterTuple();
+ ITupleReference maxFileterKey = predicate.getMaxFilterTuple();
+ boolean filtered = minFilterKey != null && maxFileterKey != null;
+ return !filtered || operationalComponents.get(componentIndex).getLSMComponentFilter().satisfy(minFilterKey,
+ maxFileterKey, opCtx.getFilterCmp());
+ }
+
+ @Override
+ protected void closeCursors() throws HyracksDataException {
+ super.closeCursors();
+ if (btreeCursors != null) {
+ // clear search states of btree cursors
+ for (int i = 0; i < numBTrees; ++i) {
+ if (btreeCursors[i] != null) {
+ if (btreeCursors[i] instanceof DiskBTreePointSearchCursor) {
+ ((DiskBTreePointSearchCursor) btreeCursors[i]).clearSearchState();
+ }
+ }
+ }
+ }
+ }
+
+ public int getKeyIndex() {
+ return ((BatchPredicate) predicate).getKeyIndex();
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
index c376262..d4903d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
@@ -44,23 +44,24 @@
public class LSMBTreePointSearchCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
- private ITreeIndexCursor[] btreeCursors;
- private final ILSMIndexOperationContext opCtx;
- private ISearchOperationCallback searchCallback;
- private RangePredicate predicate;
- private boolean includeMutableComponent;
- private int numBTrees;
+ protected ITreeIndexCursor[] btreeCursors;
+ protected final ILSMIndexOperationContext opCtx;
+ protected ISearchOperationCallback searchCallback;
+ protected RangePredicate predicate;
+ protected boolean includeMutableComponent;
+ protected int numBTrees;
private BTreeAccessor[] btreeAccessors;
- private BloomFilter[] bloomFilters;
- private ILSMHarness lsmHarness;
+ protected BloomFilter[] bloomFilters;
+ protected ILSMHarness lsmHarness;
private boolean nextHasBeenCalled;
- private boolean foundTuple;
- private int foundIn = -1;
- private ITupleReference frameTuple;
- private List<ILSMComponent> operationalComponents;
- private boolean resultOfSearchCallbackProceed = false;
+ protected boolean foundTuple;
+ protected int foundIn = -1;
+ protected ITupleReference frameTuple;
+ protected List<ILSMComponent> operationalComponents;
+ protected boolean resultOfSearchCallbackProceed = false;
- private final long[] hashes = BloomFilter.createHashArray();
+ protected final long[] hashes = BloomFilter.createHashArray();
+ protected boolean hashComputed = false;
public LSMBTreePointSearchCursor(ILSMIndexOperationContext opCtx) {
this.opCtx = opCtx;
@@ -73,9 +74,10 @@
} else if (foundTuple) {
return true;
}
+ hashComputed = false;
boolean reconciled = false;
for (int i = 0; i < numBTrees; ++i) {
- if (bloomFilters[i] != null && !bloomFilters[i].contains(predicate.getLowKey(), hashes)) {
+ if (!isSearchCandidate(i)) {
continue;
}
btreeAccessors[i].search(btreeCursors[i], predicate);
@@ -141,12 +143,27 @@
return false;
}
+ protected boolean isSearchCandidate(int componentIndex) throws HyracksDataException {
+ if (bloomFilters[componentIndex] != null) {
+ if (!hashComputed) {
+ // all bloom filters share the same hash function
+ // only compute it once for better performance
+ bloomFilters[componentIndex].computeHashes(predicate.getLowKey(), hashes);
+ hashComputed = true;
+ }
+ return bloomFilters[componentIndex].contains(hashes);
+ } else {
+ return true;
+ }
+ }
+
@Override
public void doClose() throws HyracksDataException {
try {
closeCursors();
nextHasBeenCalled = false;
foundTuple = false;
+ hashComputed = false;
} finally {
if (lsmHarness != null) {
lsmHarness.endSearch(opCtx);
@@ -196,7 +213,7 @@
if (btreeAccessors[i] == null) {
btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- btreeCursors[i] = btreeAccessors[i].createPointCursor(false);
+ btreeCursors[i] = btreeAccessors[i].createPointCursor(false, false);
} else {
// re-use
btreeAccessors[i].reset(btree, NoOpIndexAccessParameters.INSTANCE);
@@ -205,6 +222,7 @@
}
nextHasBeenCalled = false;
foundTuple = false;
+ hashComputed = false;
}
private void destroyAndNullifyCursorAtIndex(int i) throws HyracksDataException {
@@ -259,7 +277,7 @@
return null;
}
- private void closeCursors() throws HyracksDataException {
+ protected void closeCursors() throws HyracksDataException {
if (btreeCursors != null) {
for (int i = 0; i < numBTrees; ++i) {
if (btreeCursors[i] != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
index c2644c1..b7eb115 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
@@ -132,7 +132,7 @@
}
if (btreeCursors[i] == null) {
// need to create a new one
- btreeCursors[i] = btreeAccessors[i].createPointCursor(false);
+ btreeCursors[i] = btreeAccessors[i].createPointCursor(false, false);
} else {
// close
btreeCursors[i].close();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
index 22ca4b9..69faad3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
@@ -90,6 +90,12 @@
}
@Override
+ public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference pageTuple, MultiComparator cmp,
+ int startIndex) {
+ throw new UnsupportedOperationException("Stateful search is not supported by UnorderedSlotManager");
+ }
+
+ @Override
public int insertSlot(int tupleIndex, int tupleOff) {
int slotOff = getSlotEndOff() - slotSize;
setSlot(slotOff, tupleOff);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreePointSearchCursorTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreePointSearchCursorTest.java
index c2a69e1..2210372 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreePointSearchCursorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreePointSearchCursorTest.java
@@ -101,6 +101,6 @@
@Override
protected IIndexCursor createCursor(IIndexAccessor accessor) {
- return ((BTreeAccessor) accessor).createPointCursor(false);
+ return ((BTreeAccessor) accessor).createPointCursor(false, false);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
index 15774c9..c240a73 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
@@ -105,20 +105,22 @@
insertBTree(keys, btree);
// forward searches
- Assert.assertTrue(performBatchLookups(keys, btree, leafFrame, interiorFrame, minSearchKey, maxSearchKey));
+ Assert.assertTrue(
+ performBatchLookups(keys, btree, leafFrame, interiorFrame, minSearchKey, maxSearchKey, false));
+ Assert.assertTrue(performBatchLookups(keys, btree, leafFrame, interiorFrame, minSearchKey, maxSearchKey, true));
btree.deactivate();
btree.destroy();
}
private boolean performBatchLookups(ArrayList<Integer> keys, BTree btree, IBTreeLeafFrame leafFrame,
- IBTreeInteriorFrame interiorFrame, int minKey, int maxKey) throws Exception {
+ IBTreeInteriorFrame interiorFrame, int minKey, int maxKey, boolean stateful) throws Exception {
ArrayList<Integer> results = new ArrayList<>();
ArrayList<Integer> expectedResults = new ArrayList<>();
BTreeAccessor indexAccessor = btree.createAccessor(
new IndexAccessParameters(TestOperationCallback.INSTANCE, TestOperationCallback.INSTANCE));
- IIndexCursor pointCursor = indexAccessor.createPointCursor(false);
+ IIndexCursor pointCursor = indexAccessor.createPointCursor(false, stateful);
try {
for (int i = minKey; i < maxKey; i++) {
results.clear();
@@ -144,18 +146,18 @@
if (results.size() == expectedResults.size()) {
for (int k = 0; k < results.size(); k++) {
if (!results.get(k).equals(expectedResults.get(k))) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("DIFFERENT RESULTS AT: i=" + i + " k=" + k);
- LOGGER.info(results.get(k) + " " + expectedResults.get(k));
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("DIFFERENT RESULTS AT: i=" + i + " k=" + k);
+ LOGGER.error(results.get(k) + " " + expectedResults.get(k));
}
return false;
}
}
} else {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("UNEQUAL NUMBER OF RESULTS AT: i=" + i);
- LOGGER.info("RESULTS: " + results.size());
- LOGGER.info("EXPECTED RESULTS: " + expectedResults.size());
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("UNEQUAL NUMBER OF RESULTS AT: i=" + i);
+ LOGGER.error("RESULTS: " + results.size());
+ LOGGER.error("EXPECTED RESULTS: " + expectedResults.size());
}
return false;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/cursor/LSMBTreeBatchPointSearchCursorTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/cursor/LSMBTreeBatchPointSearchCursorTest.java
new file mode 100644
index 0000000..82aeed2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/cursor/LSMBTreeBatchPointSearchCursorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.cursor;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.accessors.IntegerBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.btree.impls.BatchPredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.TestOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
+import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.test.IIndexCursorTest;
+import org.apache.hyracks.storage.am.lsm.btree.LSMBTreeExamplesTest;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeBatchPointSearchCursor;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeOpContext;
+import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
+import org.apache.hyracks.storage.common.IIndexAccessor;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class LSMBTreeBatchPointSearchCursorTest extends IIndexCursorTest {
+ public static final int FIELD_COUNT = 2;
+ public static final ITypeTraits[] TYPE_TRAITS = { IntegerPointable.TYPE_TRAITS, IntegerPointable.TYPE_TRAITS };
+ @SuppressWarnings("rawtypes")
+ public static final ISerializerDeserializer[] FIELD_SERDES =
+ { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+ public static final int KEY_FIELD_COUNT = 1;
+ public static final IBinaryComparatorFactory[] CMP_FACTORIES = { IntegerBinaryComparatorFactory.INSTANCE };
+ public static final int[] BLOOM_FILTER_KEY_FIELDS = { 0 };
+ public static final Random RND = new Random(50);
+
+ private static final LSMBTreeTestHarness harness = new LSMBTreeTestHarness();
+ private static LSMBTree lsmBtree;
+ private static LSMBTreeOpContext opCtx;
+
+ @BeforeClass
+ public static void setup() throws HyracksDataException {
+ harness.setUp();
+ lsmBtree = LSMBTreeExamplesTest.createTreeIndex(harness, TYPE_TRAITS, CMP_FACTORIES, BLOOM_FILTER_KEY_FIELDS,
+ null, null, null, null);
+ lsmBtree.create();
+ lsmBtree.activate();
+ insertData(lsmBtree);
+ }
+
+ @AfterClass
+ public static void teardown() throws HyracksDataException {
+ try {
+ lsmBtree.deactivate();
+ lsmBtree.destroy();
+ } finally {
+ harness.tearDown();
+ }
+ }
+
+ @Override
+ protected List<ISearchPredicate> createSearchPredicates() throws Exception {
+ IFrame frame = new VSizeFrame(harness.getHyracksTastContext());
+ FrameTupleAppender appender = new FrameTupleAppender();
+ appender.reset(frame, true);
+ MultiComparator keyCmp = null;
+ for (int i = 0; i < 10; i++) {
+ // Build low key.
+ ArrayTupleBuilder lowKeyTb = new ArrayTupleBuilder(KEY_FIELD_COUNT);
+ ArrayTupleReference lowKey = new ArrayTupleReference();
+ TupleUtils.createIntegerTuple(lowKeyTb, lowKey, -100 + (i * 50));
+ appender.append(lowKey);
+ if (keyCmp == null) {
+ keyCmp = BTreeUtils.getSearchMultiComparator(CMP_FACTORIES, lowKey);
+ }
+ }
+ IFrameTupleAccessor accessor =
+ new FrameTupleAccessor(new RecordDescriptor(Arrays.copyOf(FIELD_SERDES, KEY_FIELD_COUNT)));
+ accessor.reset(frame.getBuffer());
+ BatchPredicate predicate = new BatchPredicate(accessor, keyCmp, null, null, null);
+ return Collections.singletonList(predicate);
+ }
+
+ @Override
+ protected IIndexCursor createCursor(IIndexAccessor accessor) {
+ opCtx = lsmBtree.createOpContext(NoOpIndexAccessParameters.INSTANCE);
+ return new LSMBTreeBatchPointSearchCursor(opCtx);
+ }
+
+ @Override
+ protected void open(IIndexAccessor accessor, IIndexCursor cursor, ISearchPredicate predicate)
+ throws HyracksDataException {
+ opCtx.reset();
+ opCtx.setOperation(IndexOperation.SEARCH);
+ lsmBtree.getOperationalComponents(opCtx);
+ opCtx.getSearchInitialState().reset(predicate, opCtx.getComponentHolder());
+ cursor.open(opCtx.getSearchInitialState(), predicate);
+ }
+
+ @Override
+ protected IIndexAccessor createAccessor() throws Exception {
+ return lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ }
+
+ public static void insertData(ITreeIndex lsmBtree) throws HyracksDataException {
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(FIELD_COUNT);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ IndexAccessParameters actx =
+ new IndexAccessParameters(TestOperationCallback.INSTANCE, TestOperationCallback.INSTANCE);
+ IIndexAccessor indexAccessor = lsmBtree.createAccessor(actx);
+ try {
+ int numInserts = 10000;
+ for (int i = 0; i < numInserts; i++) {
+ int f0 = RND.nextInt() % numInserts;
+ int f1 = 5;
+ TupleUtils.createIntegerTuple(tb, tuple, f0, f1);
+ try {
+ indexAccessor.insert(tuple);
+ } catch (HyracksDataException e) {
+ if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) {
+ e.printStackTrace();
+ throw e;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ } finally {
+ indexAccessor.destroy();
+ }
+ }
+}