Fixed the issue of not locking in-memory anti-mattered tuples in the LSM_BTree range cursor which may result in reading un-committed data.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@2931 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
index d1aa6a8..f3a5c82 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
@@ -75,6 +75,7 @@
if (reconciled || searchCallback.proceed(predicate.getLowKey())) {
// if proceed is successful, then there's no need for doing the "unlatch dance"
if (((ILSMTreeTupleReference) rangeCursors[i].getTuple()).isAntimatter()) {
+ searchCallback.cancel(predicate.getLowKey());
return false;
} else {
frameTuple = rangeCursors[i].getTuple();
@@ -97,6 +98,7 @@
if (rangeCursors[i].hasNext()) {
rangeCursors[i].next();
if (((ILSMTreeTupleReference) rangeCursors[i].getTuple()).isAntimatter()) {
+ searchCallback.cancel(predicate.getLowKey());
return false;
} else {
frameTuple = rangeCursors[i].getTuple();
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index ff01ce8..1d0310e 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -46,6 +46,7 @@
private RangePredicate predicate;
private IIndexAccessor memBTreeAccessor;
private ArrayTupleBuilder tupleBuilder;
+ private boolean proceed = true;
public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
super(opCtx);
@@ -54,82 +55,116 @@
}
@Override
- public boolean hasNext() throws HyracksDataException, IndexException {
- checkPriorityQueue();
- PriorityQueueElement pqHead = outputPriorityQueue.peek();
- if (pqHead == null) {
- // PQ is empty
- return false;
- }
+ public void reset() throws HyracksDataException, IndexException {
+ super.reset();
+ proceed = true;
+ }
- assert outputElement == null;
+ @Override
+ public void next() throws HyracksDataException {
+ outputElement = outputPriorityQueue.poll();
+ needPush = true;
+ proceed = false;
+ }
- if (searchCallback.proceed(pqHead.getTuple())) {
- // if proceed is successful, then there's no need for doing the "unlatch dance"
- return true;
- }
+ protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+ while (!outputPriorityQueue.isEmpty() || needPush == true) {
+ if (!outputPriorityQueue.isEmpty()) {
+ PriorityQueueElement checkElement = outputPriorityQueue.peek();
+ if (proceed && !searchCallback.proceed(checkElement.getTuple())) {
+ if (includeMemComponent) {
+ PriorityQueueElement inMemElement = null;
+ boolean inMemElementFound = false;
+ // scan the PQ for the in-memory component's element
+ Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
+ while (it.hasNext()) {
+ inMemElement = it.next();
+ if (inMemElement.getCursorIndex() == 0) {
+ inMemElementFound = true;
+ it.remove();
+ break;
+ }
+ }
+ if (inMemElementFound) {
+ // copy the in-mem tuple
+ if (tupleBuilder == null) {
+ tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
+ }
+ TupleUtils.copyTuple(tupleBuilder, inMemElement.getTuple(), cmp.getKeyFieldCount());
+ copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
- if (includeMemComponent) {
- PriorityQueueElement inMemElement = null;
- boolean inMemElementFound = false;
+ // unlatch/unpin
+ rangeCursors[0].reset();
- // scan the PQ for the in-memory component's element
- Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
- while (it.hasNext()) {
- inMemElement = it.next();
- if (inMemElement.getCursorIndex() == 0) {
- inMemElementFound = true;
- it.remove();
- break;
+ // reconcile
+ if (checkElement.getCursorIndex() == 0) {
+ searchCallback.reconcile(copyTuple);
+ } else {
+ searchCallback.reconcile(checkElement.getTuple());
+ }
+ // retraverse
+ reusablePred.setLowKey(copyTuple, true);
+ try {
+ memBTreeAccessor.search(rangeCursors[0], reusablePred);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ pushIntoPriorityQueue(inMemElement);
+ if (cmp.compare(copyTuple, inMemElement.getTuple()) != 0) {
+ searchCallback.cancel(copyTuple);
+ continue;
+ }
+ } else {
+ // the in-memory cursor is exhausted
+ searchCallback.reconcile(checkElement.getTuple());
+ }
+ } else {
+ searchCallback.reconcile(checkElement.getTuple());
+ }
}
- }
+ // If there is no previous tuple or the previous tuple can be ignored
+ if (outputElement == null) {
+ if (isDeleted(checkElement)) {
+ // If the key has been deleted then pop it and set needPush to true.
+ // We cannot push immediately because the tuple may be
+ // modified if hasNext() is called
+ outputElement = outputPriorityQueue.poll();
+ searchCallback.cancel(checkElement.getTuple());
+ needPush = true;
+ proceed = false;
+ } else {
+ break;
+ }
+ } else {
+ // Compare the previous tuple and the head tuple in the PQ
+ if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+ // If the previous tuple and the head tuple are
+ // identical
+ // then pop the head tuple and push the next tuple from
+ // the tree of head tuple
- if (!inMemElementFound) {
- // the in-memory cursor is exhausted
- searchCallback.reconcile(pqHead.getTuple());
- return true;
- }
-
- // copy the in-mem tuple
- if (tupleBuilder == null) {
- tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
- }
- TupleUtils.copyTuple(tupleBuilder, inMemElement.getTuple(), cmp.getKeyFieldCount());
- copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
-
- // unlatch/unpin
- rangeCursors[0].reset();
-
- // reconcile
- if (pqHead.getCursorIndex() == 0) {
- searchCallback.reconcile(copyTuple);
+ // the head element of PQ is useless now
+ PriorityQueueElement e = outputPriorityQueue.poll();
+ pushIntoPriorityQueue(e);
+ } else {
+ // If the previous tuple and the head tuple are different
+ // the info of previous tuple is useless
+ if (needPush == true) {
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ }
+ proceed = true;
+ outputElement = null;
+ }
+ }
} else {
- searchCallback.reconcile(pqHead.getTuple());
+ // the priority queue is empty and needPush
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ outputElement = null;
+ proceed = true;
}
-
- // retraverse
- reusablePred.setLowKey(copyTuple, true);
- try {
- memBTreeAccessor.search(rangeCursors[0], reusablePred);
- } catch (IndexException e) {
- throw new HyracksDataException(e);
- }
-
- if (!pushIntoPriorityQueue(inMemElement)) {
- return !outputPriorityQueue.isEmpty();
- }
-
- if (pqHead.getCursorIndex() == 0) {
- if (cmp.compare(copyTuple, inMemElement.getTuple()) != 0) {
- searchCallback.cancel(copyTuple);
- }
- }
- checkPriorityQueue();
- } else {
- searchCallback.reconcile(pqHead.getTuple());
}
-
- return true;
}
@Override
@@ -178,5 +213,6 @@
diskBTreeIx++;
}
initPriorityQueue();
+ proceed = true;
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
index 82da312..6872520 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
@@ -92,6 +92,7 @@
if (currentCursor != null) {
currentCursor.close();
}
+ currentCursor = null;
}
@Override
@@ -99,6 +100,7 @@
if (currentCursor != null) {
currentCursor.reset();
}
+ currentCursor = null;
}
@Override
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 92d3679..9fc351a 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -151,50 +151,7 @@
return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter();
}
- protected void checkPriorityQueue() throws HyracksDataException, IndexException {
- while (!outputPriorityQueue.isEmpty() || needPush == true) {
- if (!outputPriorityQueue.isEmpty()) {
- PriorityQueueElement checkElement = outputPriorityQueue.peek();
- // If there is no previous tuple or the previous tuple can be ignored
- if (outputElement == null) {
- if (isDeleted(checkElement)) {
- // If the key has been deleted then pop it and set needPush to true.
- // We cannot push immediately because the tuple may be
- // modified if hasNext() is called
- outputElement = outputPriorityQueue.poll();
- needPush = true;
- } else {
- break;
- }
- } else {
- // Compare the previous tuple and the head tuple in the PQ
- if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
- // If the previous tuple and the head tuple are
- // identical
- // then pop the head tuple and push the next tuple from
- // the tree of head tuple
-
- // the head element of PQ is useless now
- PriorityQueueElement e = outputPriorityQueue.poll();
- pushIntoPriorityQueue(e);
- } else {
- // If the previous tuple and the head tuple are different
- // the info of previous tuple is useless
- if (needPush == true) {
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- }
- outputElement = null;
- }
- }
- } else {
- // the priority queue is empty and needPush
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- outputElement = null;
- }
- }
- }
+ abstract protected void checkPriorityQueue() throws HyracksDataException, IndexException;
@Override
public boolean exclusiveLatchNodes() {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index 259af5b..1b5949a 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -113,4 +113,50 @@
}
return false;
}
+
+ @Override
+ protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+ while (!outputPriorityQueue.isEmpty() || needPush == true) {
+ if (!outputPriorityQueue.isEmpty()) {
+ PriorityQueueElement checkElement = outputPriorityQueue.peek();
+ // If there is no previous tuple or the previous tuple can be ignored
+ if (outputElement == null) {
+ if (isDeleted(checkElement)) {
+ // If the key has been deleted then pop it and set needPush to true.
+ // We cannot push immediately because the tuple may be
+ // modified if hasNext() is called
+ outputElement = outputPriorityQueue.poll();
+ needPush = true;
+ } else {
+ break;
+ }
+ } else {
+ // Compare the previous tuple and the head tuple in the PQ
+ if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+ // If the previous tuple and the head tuple are
+ // identical
+ // then pop the head tuple and push the next tuple from
+ // the tree of head tuple
+
+ // the head element of PQ is useless now
+ PriorityQueueElement e = outputPriorityQueue.poll();
+ pushIntoPriorityQueue(e);
+ } else {
+ // If the previous tuple and the head tuple are different
+ // the info of previous tuple is useless
+ if (needPush == true) {
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ }
+ outputElement = null;
+ }
+ }
+ } else {
+ // the priority queue is empty and needPush
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ outputElement = null;
+ }
+ }
+ }
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index f9dd819..8632150 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -212,4 +212,49 @@
}
}
+ @Override
+ protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+ while (!outputPriorityQueue.isEmpty() || needPush == true) {
+ if (!outputPriorityQueue.isEmpty()) {
+ PriorityQueueElement checkElement = outputPriorityQueue.peek();
+ // If there is no previous tuple or the previous tuple can be ignored
+ if (outputElement == null) {
+ if (isDeleted(checkElement)) {
+ // If the key has been deleted then pop it and set needPush to true.
+ // We cannot push immediately because the tuple may be
+ // modified if hasNext() is called
+ outputElement = outputPriorityQueue.poll();
+ needPush = true;
+ } else {
+ break;
+ }
+ } else {
+ // Compare the previous tuple and the head tuple in the PQ
+ if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+ // If the previous tuple and the head tuple are
+ // identical
+ // then pop the head tuple and push the next tuple from
+ // the tree of head tuple
+
+ // the head element of PQ is useless now
+ PriorityQueueElement e = outputPriorityQueue.poll();
+ pushIntoPriorityQueue(e);
+ } else {
+ // If the previous tuple and the head tuple are different
+ // the info of previous tuple is useless
+ if (needPush == true) {
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ }
+ outputElement = null;
+ }
+ }
+ } else {
+ // the priority queue is empty and needPush
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ outputElement = null;
+ }
+ }
+ }
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index f8ad0b2..9341f77 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -140,13 +140,11 @@
@Override
public boolean proceed(ITupleReference tuple) {
- Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
return false;
}
@Override
public void reconcile(ITupleReference tuple) {
- Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
if (blockOnHigh) {
try {
TupleUtils.createIntegerTuple(builder, SearchTask.this.tuple, expectedAfterBlock);