fixed LSM search cursor to properly release mem component latches during search opcallback sequence
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1761 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java
index 183af6e..d50a2c8 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java
@@ -194,6 +194,8 @@
if (originalKeyCmp.compare(reconciliationTuple, frameTuple) == 0) {
if (highKey == null || tupleIndex <= stopTupleIndex) {
return true;
+ } else {
+ return false;
}
} else { // otherwise do the opCallback dance again with the new tuple we found
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 7f6e511..fe86a2f 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -328,7 +328,8 @@
int numDiskBTrees = diskComponents.size();
int numBTrees = (includeMemComponent) ? numDiskBTrees + 1 : numDiskBTrees;
LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees, insertLeafFrameFactory,
- ctx.cmp, includeMemComponent, searcherRefCount, lsmHarness, null);
+ ctx.cmp, includeMemComponent, searcherRefCount, lsmHarness, memBTree.createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE), pred, ctx.searchCallback);
lsmTreeCursor.open(initialState, pred);
int cursorIx;
@@ -347,7 +348,8 @@
ListIterator<Object> diskBTreesIter = diskComponents.listIterator();
while (diskBTreesIter.hasNext()) {
BTree diskBTree = (BTree) diskBTreesIter.next();
- diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor(ctx.modificationCallback, ctx.searchCallback);
+ diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
diskBTreeAccessors[diskBTreeIx].search(lsmTreeCursor.getCursor(cursorIx), pred);
cursorIx++;
diskBTreeIx++;
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
index 9469e91..329c412 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
@@ -18,7 +18,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
@@ -33,11 +35,13 @@
private final AtomicInteger searcherfRefCount;
private final LSMHarness lsmHarness;
+ private final IIndexAccessor memBtreeAccessor;
+ private final ISearchPredicate predicate;
private ISearchOperationCallback searchCallback;
public LSMBTreeCursorInitialState(int numBTrees, ITreeIndexFrameFactory leafFrameFactory, MultiComparator cmp,
boolean includeMemComponent, AtomicInteger searcherfRefCount, LSMHarness lsmHarness,
- ISearchOperationCallback searchCallback) {
+ IIndexAccessor memBtreeAccessor, ISearchPredicate predicate, ISearchOperationCallback searchCallback) {
this.numBTrees = numBTrees;
this.leafFrameFactory = leafFrameFactory;
this.cmp = cmp;
@@ -45,6 +49,8 @@
this.searcherfRefCount = searcherfRefCount;
this.lsmHarness = lsmHarness;
this.searchCallback = searchCallback;
+ this.memBtreeAccessor = memBtreeAccessor;
+ this.predicate = predicate;
}
public int getNumBTrees() {
@@ -90,6 +96,14 @@
this.searchCallback = searchCallback;
}
+ public IIndexAccessor getMemBTreeAccessor() {
+ return memBtreeAccessor;
+ }
+
+ public ISearchPredicate getSearchPredicate() {
+ return predicate;
+ }
+
@Override
public MultiComparator getOriginalKeyComparator() {
return cmp;
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index f7e3aae..0262a2e 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -16,21 +16,27 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.PriorityQueue;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
-import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeSearchCursor;
public class LSMBTreeRangeSearchCursor extends LSMTreeSearchCursor {
private PriorityQueueComparator pqCmp;
- private ISearchOperationCallback searchCallback;
+ private IIndexAccessor memBTreeAccessor;
+ private RangePredicate predicate;
+ private RangePredicate reusablePred = new RangePredicate(null, null, true, true, null, null);
public LSMBTreeRangeSearchCursor() {
outputElement = null;
@@ -52,9 +58,86 @@
}
@Override
+ public boolean hasNext() throws HyracksDataException {
+ checkPriorityQueue();
+ PriorityQueueElement pqHead = outputPriorityQueue.peek();
+ if (pqHead == null) {
+ // pq is empty
+ return false;
+ }
+
+ if (searchCallback.proceed(pqHead.getTuple())) {
+ // if proceed is successful, then there's no need for doing the "unlatch dance"
+ return true;
+ }
+
+ if (includeMemComponent) {
+ PriorityQueueElement inMemElement = null;
+ boolean inMemElementFound = false;
+
+ // scan the PQ for the in-memory component's element
+ if (outputElement != null && outputElement.getCursorIndex() == 0) {
+ inMemElement = outputElement;
+ } else {
+ Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
+ while (it.hasNext()) {
+ inMemElement = it.next();
+ if (inMemElement.getCursorIndex() == 0) {
+ inMemElementFound = true;
+ outputPriorityQueue.remove(inMemElement);
+ break;
+ }
+ }
+ }
+
+ if (!inMemElementFound && inMemElement != null) {
+ searchCallback.reconcile(pqHead.getTuple());
+ return true;
+ }
+
+ // copy the in-mem tuple
+ ITupleReference inMemTuple = TupleUtils.copyTuple(inMemElement.getTuple());
+ // unlatch
+ rangeCursors[0].reset();
+ // reconcile
+ if (pqHead.getCursorIndex() == 0) {
+ searchCallback.reconcile(inMemTuple);
+ } else {
+ searchCallback.reconcile(pqHead.getTuple());
+ }
+
+ reusablePred.setLowKey(inMemTuple, true);
+ reusablePred.setLowKeyComparator(cmp);
+ reusablePred.setHighKey(predicate.getHighKey(), predicate.isHighKeyInclusive());
+ reusablePred.setHighKeyComparator(predicate.getHighKeyComparator());
+ try {
+ memBTreeAccessor.search(rangeCursors[0], reusablePred);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+
+ // todo: make lsmbtreetuplereference copy
+ if (rangeCursors[0].hasNext()) {
+ rangeCursors[0].next();
+ inMemElement.reset(rangeCursors[0].getTuple(), 0);
+ if (inMemElementFound) {
+ outputPriorityQueue.offer(inMemElement);
+ }
+ } else {
+ rangeCursors[0].close();
+ }
+ } else {
+ searchCallback.reconcile(pqHead.getTuple());
+ }
+
+ return true;
+ // return !outputPriorityQueue.isEmpty();
+ }
+
+ @Override
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+ super.open(initialState, searchPred);
LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
- searchCallback = lsmInitialState.getSearchOperationCallback();
cmp = lsmInitialState.getCmp();
int numBTrees = lsmInitialState.getNumBTrees();
rangeCursors = new BTreeRangeSearchCursor[numBTrees];
@@ -65,6 +148,8 @@
includeMemComponent = lsmInitialState.getIncludeMemComponent();
searcherRefCount = lsmInitialState.getSearcherRefCount();
lsmHarness = lsmInitialState.getLSMHarness();
+ memBTreeAccessor = lsmInitialState.getMemBTreeAccessor();
+ predicate = (RangePredicate) lsmInitialState.getSearchPredicate();
setPriorityQueueComparator();
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 66ea185..6e22d53 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -185,6 +185,13 @@
return;
}
+ // TODO: Move this to just before the merge cleanup and remove latching on disk components
+ // The implementation of this call must take any necessary steps to make
+ // the new component permanent, and mark it as valid (usually this means
+ // forcing all pages of the tree to disk, possibly with some extra
+ // information to mark the tree as valid).
+ lsmIndex.getComponentFinalizer().finalize(newComponent);
+
// Remove the old Trees from the list, and add the new merged Tree(s).
// Also, swap the searchRefCount.
synchronized (diskComponentsSync) {
@@ -210,12 +217,6 @@
}
}
- // The implementation of this call must take any necessary steps to make
- // the new component permanent, and mark it as valid (usually this means
- // forcing all pages of the tree to disk, possibly with some extra
- // information to mark the tree as valid).
- lsmIndex.getComponentFinalizer().finalize(newComponent);
-
// Cleanup. At this point we have guaranteed that no searchers are
// touching the old on-disk Trees (localSearcherRefCount == 0).
lsmIndex.cleanUpAfterMerge(mergedComponents);
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
index f622d87..70f1c0f 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
@@ -20,6 +20,9 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
@@ -37,6 +40,7 @@
protected boolean includeMemComponent;
protected AtomicInteger searcherRefCount;
protected LSMHarness lsmHarness;
+ protected ISearchOperationCallback searchCallback;
public LSMTreeSearchCursor() {
outputElement = null;
@@ -133,8 +137,7 @@
while (!outputPriorityQueue.isEmpty() || needPush == true) {
if (!outputPriorityQueue.isEmpty()) {
PriorityQueueElement checkElement = outputPriorityQueue.peek();
- // If there is no previous tuple or the previous tuple can be
- // ignored
+ // If there is no previous tuple or the previous tuple can be ignored
if (outputElement == null) {
// Test the tuple is a delete tuple or not
if (((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter() == true) {
@@ -160,8 +163,7 @@
// int treeNum = reusedElement.getTreeNum();
pushIntoPriorityQueue(reusedElement.getCursorIndex());
} else {
- // If the previous tuple and the head tuple are
- // different
+ // If the previous tuple and the head tuple are different
// the info of previous tuple is useless
if (needPush == true) {
reusedElement = outputElement;
@@ -207,4 +209,9 @@
this.cursorIndex = cursorIndex;
}
}
+
+ @Override
+ public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+ searchCallback = initialState.getSearchOperationCallback();
+ }
}