[ASTERIXDB-2205][STO] Maintain includeMutableComponent correctly

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- This change fixes ASTERIXDB-2205. The root cause
  for ASTERIXDB-2205 was that the value of
  includeMutableComponent is not maintained correctly.

Change-Id: Ic08a9372c608d6de960e1419899530aa55aa72e0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2243
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 3e14fb9..876bc6d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -105,53 +105,46 @@
         while (!outputPriorityQueue.isEmpty() || needPushElementIntoQueue) {
             if (!outputPriorityQueue.isEmpty()) {
                 PriorityQueueElement queueHead = outputPriorityQueue.peek();
-                if (canCallProceed) {
-                    // if there are no memory components. no need to lock at all
-                    // since whatever the search reads will never changes
-                    if (includeMutableComponent) {
-                        if (!searchCallback.proceed(queueHead.getTuple())) {
-                            // In case proceed() fails and there is an in-memory component,
-                            // we can't simply use this element since there might be a change.
-                            PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0);
-                            if (mutableElement != null) {
-                                // Copies the current queue head
-                                if (tupleBuilder == null) {
-                                    tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
-                                }
-                                TupleUtils.copyTuple(tupleBuilder, queueHead.getTuple(), cmp.getKeyFieldCount());
-                                copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
-                                // Unlatches/unpins the leaf page of the index.
-                                rangeCursors[0].reset();
-                                // Reconcile.
-                                searchCallback.reconcile(copyTuple);
-                                // Re-traverses the index.
-                                reusablePred.setLowKey(copyTuple, true);
-                                btreeAccessors[0].search(rangeCursors[0], reusablePred);
-                                //------
-                                includeMutableComponent = pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
-                                // now that we have completed the search and we have latches over the pages,
-                                // it is safe to complete the operation.. but as per the API of the callback
-                                // we only complete if we're producing this tuple
-                                // get head again
-                                queueHead = outputPriorityQueue.peek();
-                                /*
-                                 * We need to restart in one of two cases:
-                                 * 1. no more elements in the priority queue.
-                                 * 2. the key of the head has changed (which means we need to call proceed)
-                                 */
-                                if (queueHead == null || cmp.compare(copyTuple, queueHead.getTuple()) != 0) {
-                                    // cancel since we're not continuing
-                                    searchCallback.cancel(copyTuple);
-                                    continue;
-                                }
-                                searchCallback.complete(copyTuple);
-                                // it is safe to proceed now
-                            } else {
-                                // There are no more elements in the memory component.. can safely skip locking for the
-                                // remaining operations
-                                includeMutableComponent = false;
-                            }
+                if (canCallProceed && includeMutableComponent && !searchCallback.proceed(queueHead.getTuple())) {
+                    // In case proceed() fails and there is an in-memory component,
+                    // we can't simply use this element since there might be a change.
+                    PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0);
+                    if (mutableElement != null) {
+                        // Copies the current queue head
+                        if (tupleBuilder == null) {
+                            tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
                         }
+                        TupleUtils.copyTuple(tupleBuilder, queueHead.getTuple(), cmp.getKeyFieldCount());
+                        copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+                        // Unlatches/unpins the leaf page of the index.
+                        rangeCursors[0].reset();
+                        // Reconcile.
+                        searchCallback.reconcile(copyTuple);
+                        // Re-traverses the index.
+                        reusablePred.setLowKey(copyTuple, true);
+                        btreeAccessors[0].search(rangeCursors[0], reusablePred);
+                        pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
+                        // now that we have completed the search and we have latches over the pages,
+                        // it is safe to complete the operation.. but as per the API of the callback
+                        // we only complete if we're producing this tuple
+                        // get head again
+                        queueHead = outputPriorityQueue.peek();
+                        /*
+                         * We need to restart in one of two cases:
+                         * 1. no more elements in the priority queue.
+                         * 2. the key of the head has changed (which means we need to call proceed)
+                         */
+                        if (queueHead == null || cmp.compare(copyTuple, queueHead.getTuple()) != 0) {
+                            // cancel since we're not continuing
+                            searchCallback.cancel(copyTuple);
+                            continue;
+                        }
+                        searchCallback.complete(copyTuple);
+                        // it is safe to proceed now
+                    } else {
+                        // There are no more elements in the memory component.. can safely skip locking for the
+                        // remaining operations
+                        includeMutableComponent = false;
                     }
                 }
 
@@ -217,19 +210,21 @@
         opCtx.getIndex().getHarness().replaceMemoryComponentsWithDiskComponents(getOpCtx(), replaceFrom);
         // redo the search on the new component
         for (int i = replaceFrom; i < switchRequest.length; i++) {
-            if (switchRequest[i] && switchedElements[i] != null) {
-                copyTuple.reset(switchComponentTupleBuilders[i].getFieldEndOffsets(),
-                        switchComponentTupleBuilders[i].getByteArray());
-                reusablePred.setLowKey(copyTuple, true);
-                rangeCursors[i].reset();
+            if (switchRequest[i]) {
                 ILSMComponent component = operationalComponents.get(i);
                 BTree btree = (BTree) component.getIndex();
                 if (i == 0 && component.getType() != LSMComponentType.MEMORY) {
                     includeMutableComponent = false;
                 }
-                btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-                btreeAccessors[i].search(rangeCursors[i], reusablePred);
-                pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
+                if (switchedElements[i] != null) {
+                    copyTuple.reset(switchComponentTupleBuilders[i].getFieldEndOffsets(),
+                            switchComponentTupleBuilders[i].getByteArray());
+                    reusablePred.setLowKey(copyTuple, true);
+                    rangeCursors[i].reset();
+                    btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+                    btreeAccessors[i].search(rangeCursors[i], reusablePred);
+                    pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
+                }
             }
             switchRequest[i] = false;
             // any failed switch makes further switches pointless
@@ -305,7 +300,7 @@
                 // Re-traverses the index.
                 reusablePred.setLowKey(copyTuple, true);
                 btreeAccessors[0].search(rangeCursors[0], reusablePred);
-                includeMutableComponent = pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
+                pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
             }
         }
         tupleFromMemoryComponentCount = 0;
@@ -354,13 +349,11 @@
                 // re-use
                 rangeCursors[i].reset();
             }
+
             if (component.getType() == LSMComponentType.MEMORY) {
                 includeMutableComponent = true;
-                btree = (BTree) component.getIndex();
-            } else {
-                btree = (BTree) component.getIndex();
             }
-
+            btree = (BTree) component.getIndex();
             if (btreeAccessors[i] == null) {
                 btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             } else {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 17c681c..e37669e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -194,16 +194,18 @@
         return filter == null ? null : filter.getMaxTuple();
     }
 
-    protected boolean pushIntoQueueFromCursorAndReplaceThisElement(PriorityQueueElement e) throws HyracksDataException {
+    protected void pushIntoQueueFromCursorAndReplaceThisElement(PriorityQueueElement e) throws HyracksDataException {
         int cursorIndex = e.getCursorIndex();
         if (rangeCursors[cursorIndex].hasNext()) {
             rangeCursors[cursorIndex].next();
             e.reset(rangeCursors[cursorIndex].getTuple());
             outputPriorityQueue.offer(e);
-            return true;
+            return;
         }
         rangeCursors[cursorIndex].close();
-        return false;
+        if (cursorIndex == 0) {
+            includeMutableComponent = false;
+        }
     }
 
     protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException {
@@ -324,5 +326,4 @@
             throws HyracksDataException {
         return cmp.compare(tupleA, tupleB);
     }
-
 }