[ASTERIXDB-2012][TX] Fix Sporadic double release in btree search

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- LSMBTreeRangeSearchCursor sometimes unlocks twice producing
  illegal state exception. This happens if:
  1. the proceed call failed (causing instant lock to fail)
  2. the lock was acquired and then released. This happens if:
    a. the priority queue head was not coming from memory component.
    b. the priority queue head was from memory component and it didn't
       change when search was re-performed
  3. the tuple was found to be antimatter.

- Moreover, locks that are acquired in case the mutable component is not
  part of the search anymore are not released until the job completes.

Change-Id: I497ec7c14074390bd6408a3854202159bec5f1cf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1912
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 3b8b160..29d7c60 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -20,6 +20,7 @@
 package org.apache.hyracks.storage.am.lsm.btree.impls;
 
 import java.util.Iterator;
+import java.util.PriorityQueue;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -49,7 +50,6 @@
     private BTreeAccessor[] btreeAccessors;
     private ArrayTupleBuilder tupleBuilder;
     private boolean canCallProceed = true;
-    private boolean resultOfSearchCallBackProceed = false;
 
     public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
         this(opCtx, false);
@@ -86,76 +86,55 @@
      */
     @Override
     protected void checkPriorityQueue() throws HyracksDataException {
-        while (!outputPriorityQueue.isEmpty() || needPushElementIntoQueue == true) {
+        while (!outputPriorityQueue.isEmpty() || needPushElementIntoQueue) {
             if (!outputPriorityQueue.isEmpty()) {
-                PriorityQueueElement checkElement = outputPriorityQueue.peek();
+                PriorityQueueElement queueHead = outputPriorityQueue.peek();
                 if (canCallProceed) {
-                    resultOfSearchCallBackProceed = searchCallback.proceed(checkElement.getTuple());
-                    if (!resultOfSearchCallBackProceed) {
-                        // In case proceed() fails and there is an in-memory component,
-                        // we can't simply use this element since there might be a change.
-                        if (includeMutableComponent) {
-                            PriorityQueueElement mutableElement = null;
-                            boolean mutableElementFound = false;
-                            // Scans the PQ for the mutable component's element and delete it
-                            // since it can be changed.
-                            // (i.e. we can't ensure that the element is the most current one.)
-                            Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
-                            while (it.hasNext()) {
-                                mutableElement = it.next();
-                                if (mutableElement.getCursorIndex() == 0) {
-                                    mutableElementFound = true;
-                                    it.remove();
-                                    break;
-                                }
-                            }
-                            if (mutableElementFound) {
-                                // Copies the in-memory tuple.
+                    // if there are no memory components. no need to lock at all
+                    // since whatever the search reads will never changes
+                    if (includeMutableComponent) {
+                        if (!searchCallback.proceed(queueHead.getTuple())) {
+                            // In case proceed() fails and there is an in-memory component,
+                            // we can't simply use this element since there might be a change.
+                            PriorityQueueElement mutableElement = removeMutable(outputPriorityQueue);
+                            if (mutableElement != null) {
+                                // Copies the current queue head
                                 if (tupleBuilder == null) {
                                     tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
                                 }
-                                TupleUtils.copyTuple(tupleBuilder, mutableElement.getTuple(), cmp.getKeyFieldCount());
+                                TupleUtils.copyTuple(tupleBuilder, queueHead.getTuple(), cmp.getKeyFieldCount());
                                 copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
-
                                 // Unlatches/unpins the leaf page of the index.
                                 rangeCursors[0].reset();
-
-                                // Tries to reconcile.
-                                if (checkElement.getCursorIndex() == 0) {
-                                    searchCallback.reconcile(copyTuple);
-                                } else {
-                                    // If this element is from the disk component, we can call complete()
-                                    // after reconcile() since we can guarantee that there is no change.
-                                    searchCallback.reconcile(checkElement.getTuple());
-                                    searchCallback.complete(checkElement.getTuple());
-                                }
+                                // Reconcile.
+                                searchCallback.reconcile(copyTuple);
                                 // Re-traverses the index.
                                 reusablePred.setLowKey(copyTuple, true);
                                 btreeAccessors[0].search(rangeCursors[0], reusablePred);
-                                boolean isNotExhaustedCursor =
-                                        pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
-
-                                if (checkElement.getCursorIndex() == 0) {
-                                    if (!isNotExhaustedCursor
-                                            || cmp.compare(copyTuple, mutableElement.getTuple()) != 0) {
-                                        // The searched key no longer exists. We call cancel() to
-                                        // reverse the effect of reconcile() method.
-                                        searchCallback.cancel(copyTuple);
-                                        continue;
-                                    }
-                                    // The searched key is still there.
-                                    // TODO: do we need to call or not call complete() in this case?
-                                    searchCallback.complete(copyTuple);
+                                //------
+                                includeMutableComponent = pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
+                                // now that we have completed the search and we have latches over the pages,
+                                // it is safe to complete the operation.. but as per the API of the callback
+                                // we only complete if we're producing this tuple
+                                // get head again
+                                queueHead = outputPriorityQueue.peek();
+                                /*
+                                 * We need to restart in one of two cases:
+                                 * 1. no more elements in the priority queue.
+                                 * 2. the key of the head has changed (which means we need to call proceed)
+                                 */
+                                if (queueHead == null || cmp.compare(copyTuple, queueHead.getTuple()) != 0) {
+                                    // cancel since we're not continuing
+                                    searchCallback.cancel(copyTuple);
+                                    continue;
                                 }
+                                searchCallback.complete(copyTuple);
+                                // it is safe to proceed now
                             } else {
-                                // The mutable cursor is exhausted and it couldn't find the element.
-                                // The failed element did not come from the in-memory component.
-                                searchCallback.reconcile(checkElement.getTuple());
+                                // There are no more elements in the memory component.. can safely skip locking for the
+                                // remaining operations
+                                includeMutableComponent = false;
                             }
-                        } else {
-                            // proceed() failed. However, there is no in-memory component.
-                            // So just call reconcile.
-                            searchCallback.reconcile(checkElement.getTuple());
                         }
                     }
                 }
@@ -163,14 +142,11 @@
                 // If there is no previous tuple or the previous tuple can be ignored.
                 // This check is needed not to release the same tuple again.
                 if (outputElement == null) {
-                    if (isDeleted(checkElement) && !returnDeletedTuples) {
+                    if (isDeleted(queueHead) && !returnDeletedTuples) {
                         // If the key has been deleted then pop it and set needPush to true.
                         // We cannot push immediately because the tuple may be
                         // modified if hasNext() is called
                         outputElement = outputPriorityQueue.poll();
-                        if (!resultOfSearchCallBackProceed) {
-                            searchCallback.cancel(checkElement.getTuple());
-                        }
                         needPushElementIntoQueue = true;
                         canCallProceed = false;
                     } else {
@@ -178,12 +154,11 @@
                     }
                 } else {
                     // Compare the previous tuple and the head tuple in the PQ
-                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+                    if (compare(cmp, outputElement.getTuple(), queueHead.getTuple()) == 0) {
                         // If the previous tuple and the head tuple are
                         // identical
                         // then pop the head tuple and push the next tuple from
                         // the tree of head tuple
-
                         // the head element of PQ is useless now
                         PriorityQueueElement e = outputPriorityQueue.poll();
                         pushIntoQueueFromCursorAndReplaceThisElement(e);
@@ -206,6 +181,22 @@
                 canCallProceed = true;
             }
         }
+
+    }
+
+    private PriorityQueueElement removeMutable(PriorityQueue<PriorityQueueElement> outputPriorityQueue) {
+        // Scans the PQ for the mutable component's element and delete it
+        // since it can be changed.
+        // (i.e. we can't ensure that the element is the most current one.)
+        Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
+        while (it.hasNext()) {
+            PriorityQueueElement mutableElement = it.next();
+            if (mutableElement.getCursorIndex() == 0) {
+                it.remove();
+                return mutableElement;
+            }
+        }
+        return null;
     }
 
     @Override