[ASTERIXDB-2205][STO] Maintain includeMutableComponent correctly
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- This change fixes ASTERIXDB-2205. The root cause
for ASTERIXDB-2205 was that the value of
includeMutableComponent is not maintained correctly.
Change-Id: Ic08a9372c608d6de960e1419899530aa55aa72e0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2243
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 3e14fb9..876bc6d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -105,53 +105,46 @@
while (!outputPriorityQueue.isEmpty() || needPushElementIntoQueue) {
if (!outputPriorityQueue.isEmpty()) {
PriorityQueueElement queueHead = outputPriorityQueue.peek();
- if (canCallProceed) {
- // if there are no memory components. no need to lock at all
- // since whatever the search reads will never changes
- if (includeMutableComponent) {
- if (!searchCallback.proceed(queueHead.getTuple())) {
- // In case proceed() fails and there is an in-memory component,
- // we can't simply use this element since there might be a change.
- PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0);
- if (mutableElement != null) {
- // Copies the current queue head
- if (tupleBuilder == null) {
- tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
- }
- TupleUtils.copyTuple(tupleBuilder, queueHead.getTuple(), cmp.getKeyFieldCount());
- copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
- // Unlatches/unpins the leaf page of the index.
- rangeCursors[0].reset();
- // Reconcile.
- searchCallback.reconcile(copyTuple);
- // Re-traverses the index.
- reusablePred.setLowKey(copyTuple, true);
- btreeAccessors[0].search(rangeCursors[0], reusablePred);
- //------
- includeMutableComponent = pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
- // now that we have completed the search and we have latches over the pages,
- // it is safe to complete the operation.. but as per the API of the callback
- // we only complete if we're producing this tuple
- // get head again
- queueHead = outputPriorityQueue.peek();
- /*
- * We need to restart in one of two cases:
- * 1. no more elements in the priority queue.
- * 2. the key of the head has changed (which means we need to call proceed)
- */
- if (queueHead == null || cmp.compare(copyTuple, queueHead.getTuple()) != 0) {
- // cancel since we're not continuing
- searchCallback.cancel(copyTuple);
- continue;
- }
- searchCallback.complete(copyTuple);
- // it is safe to proceed now
- } else {
- // There are no more elements in the memory component.. can safely skip locking for the
- // remaining operations
- includeMutableComponent = false;
- }
+ if (canCallProceed && includeMutableComponent && !searchCallback.proceed(queueHead.getTuple())) {
+ // In case proceed() fails and there is an in-memory component,
+ // we can't simply use this element since there might be a change.
+ PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0);
+ if (mutableElement != null) {
+ // Copies the current queue head
+ if (tupleBuilder == null) {
+ tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
}
+ TupleUtils.copyTuple(tupleBuilder, queueHead.getTuple(), cmp.getKeyFieldCount());
+ copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ // Unlatches/unpins the leaf page of the index.
+ rangeCursors[0].reset();
+ // Reconcile.
+ searchCallback.reconcile(copyTuple);
+ // Re-traverses the index.
+ reusablePred.setLowKey(copyTuple, true);
+ btreeAccessors[0].search(rangeCursors[0], reusablePred);
+ pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
+ // now that we have completed the search and we have latches over the pages,
+ // it is safe to complete the operation.. but as per the API of the callback
+ // we only complete if we're producing this tuple
+ // get head again
+ queueHead = outputPriorityQueue.peek();
+ /*
+ * We need to restart in one of two cases:
+ * 1. no more elements in the priority queue.
+ * 2. the key of the head has changed (which means we need to call proceed)
+ */
+ if (queueHead == null || cmp.compare(copyTuple, queueHead.getTuple()) != 0) {
+ // cancel since we're not continuing
+ searchCallback.cancel(copyTuple);
+ continue;
+ }
+ searchCallback.complete(copyTuple);
+ // it is safe to proceed now
+ } else {
+ // There are no more elements in the memory component.. can safely skip locking for the
+ // remaining operations
+ includeMutableComponent = false;
}
}
@@ -217,19 +210,21 @@
opCtx.getIndex().getHarness().replaceMemoryComponentsWithDiskComponents(getOpCtx(), replaceFrom);
// redo the search on the new component
for (int i = replaceFrom; i < switchRequest.length; i++) {
- if (switchRequest[i] && switchedElements[i] != null) {
- copyTuple.reset(switchComponentTupleBuilders[i].getFieldEndOffsets(),
- switchComponentTupleBuilders[i].getByteArray());
- reusablePred.setLowKey(copyTuple, true);
- rangeCursors[i].reset();
+ if (switchRequest[i]) {
ILSMComponent component = operationalComponents.get(i);
BTree btree = (BTree) component.getIndex();
if (i == 0 && component.getType() != LSMComponentType.MEMORY) {
includeMutableComponent = false;
}
- btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- btreeAccessors[i].search(rangeCursors[i], reusablePred);
- pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
+ if (switchedElements[i] != null) {
+ copyTuple.reset(switchComponentTupleBuilders[i].getFieldEndOffsets(),
+ switchComponentTupleBuilders[i].getByteArray());
+ reusablePred.setLowKey(copyTuple, true);
+ rangeCursors[i].reset();
+ btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ btreeAccessors[i].search(rangeCursors[i], reusablePred);
+ pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
+ }
}
switchRequest[i] = false;
// any failed switch makes further switches pointless
@@ -305,7 +300,7 @@
// Re-traverses the index.
reusablePred.setLowKey(copyTuple, true);
btreeAccessors[0].search(rangeCursors[0], reusablePred);
- includeMutableComponent = pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
+ pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
}
}
tupleFromMemoryComponentCount = 0;
@@ -354,13 +349,11 @@
// re-use
rangeCursors[i].reset();
}
+
if (component.getType() == LSMComponentType.MEMORY) {
includeMutableComponent = true;
- btree = (BTree) component.getIndex();
- } else {
- btree = (BTree) component.getIndex();
}
-
+ btree = (BTree) component.getIndex();
if (btreeAccessors[i] == null) {
btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
} else {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 17c681c..e37669e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -194,16 +194,18 @@
return filter == null ? null : filter.getMaxTuple();
}
- protected boolean pushIntoQueueFromCursorAndReplaceThisElement(PriorityQueueElement e) throws HyracksDataException {
+ protected void pushIntoQueueFromCursorAndReplaceThisElement(PriorityQueueElement e) throws HyracksDataException {
int cursorIndex = e.getCursorIndex();
if (rangeCursors[cursorIndex].hasNext()) {
rangeCursors[cursorIndex].next();
e.reset(rangeCursors[cursorIndex].getTuple());
outputPriorityQueue.offer(e);
- return true;
+ return;
}
rangeCursors[cursorIndex].close();
- return false;
+ if (cursorIndex == 0) {
+ includeMutableComponent = false;
+ }
}
protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException {
@@ -324,5 +326,4 @@
throws HyracksDataException {
return cmp.compare(tupleA, tupleB);
}
-
}