fixed LSM search cursor to properly release mem component latches during search opcallback sequence

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1761 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java
index 183af6e..d50a2c8 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java
@@ -194,6 +194,8 @@
                 if (originalKeyCmp.compare(reconciliationTuple, frameTuple) == 0) {
                     if (highKey == null || tupleIndex <= stopTupleIndex) {
                         return true;
+                    } else {
+                        return false;
                     }
 
                 } else { // otherwise do the opCallback dance again with the new tuple we found
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 7f6e511..fe86a2f 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -328,7 +328,8 @@
         int numDiskBTrees = diskComponents.size();
         int numBTrees = (includeMemComponent) ? numDiskBTrees + 1 : numDiskBTrees;
         LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees, insertLeafFrameFactory,
-                ctx.cmp, includeMemComponent, searcherRefCount, lsmHarness, null);
+                ctx.cmp, includeMemComponent, searcherRefCount, lsmHarness, memBTree.createAccessor(
+                        NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE), pred, ctx.searchCallback);
         lsmTreeCursor.open(initialState, pred);
 
         int cursorIx;
@@ -347,7 +348,8 @@
         ListIterator<Object> diskBTreesIter = diskComponents.listIterator();
         while (diskBTreesIter.hasNext()) {
             BTree diskBTree = (BTree) diskBTreesIter.next();
-            diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor(ctx.modificationCallback, ctx.searchCallback);
+            diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
             diskBTreeAccessors[diskBTreeIx].search(lsmTreeCursor.getCursor(cursorIx), pred);
             cursorIx++;
             diskBTreeIx++;
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
index 9469e91..329c412 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
@@ -18,7 +18,9 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
@@ -33,11 +35,13 @@
     private final AtomicInteger searcherfRefCount;
     private final LSMHarness lsmHarness;
 
+    private final IIndexAccessor memBtreeAccessor;
+    private final ISearchPredicate predicate;
     private ISearchOperationCallback searchCallback;
 
     public LSMBTreeCursorInitialState(int numBTrees, ITreeIndexFrameFactory leafFrameFactory, MultiComparator cmp,
             boolean includeMemComponent, AtomicInteger searcherfRefCount, LSMHarness lsmHarness,
-            ISearchOperationCallback searchCallback) {
+            IIndexAccessor memBtreeAccessor, ISearchPredicate predicate, ISearchOperationCallback searchCallback) {
         this.numBTrees = numBTrees;
         this.leafFrameFactory = leafFrameFactory;
         this.cmp = cmp;
@@ -45,6 +49,8 @@
         this.searcherfRefCount = searcherfRefCount;
         this.lsmHarness = lsmHarness;
         this.searchCallback = searchCallback;
+        this.memBtreeAccessor = memBtreeAccessor;
+        this.predicate = predicate;
     }
 
     public int getNumBTrees() {
@@ -90,6 +96,14 @@
         this.searchCallback = searchCallback;
     }
 
+    public IIndexAccessor getMemBTreeAccessor() {
+        return memBtreeAccessor;
+    }
+
+    public ISearchPredicate getSearchPredicate() {
+        return predicate;
+    }
+
     @Override
     public MultiComparator getOriginalKeyComparator() {
         return cmp;
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index f7e3aae..0262a2e 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -16,21 +16,27 @@
 package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
 
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.PriorityQueue;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
-import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeSearchCursor;
 
 public class LSMBTreeRangeSearchCursor extends LSMTreeSearchCursor {
     private PriorityQueueComparator pqCmp;
-    private ISearchOperationCallback searchCallback;
+    private IIndexAccessor memBTreeAccessor;
+    private RangePredicate predicate;
+    private RangePredicate reusablePred = new RangePredicate(null, null, true, true, null, null);
 
     public LSMBTreeRangeSearchCursor() {
         outputElement = null;
@@ -52,9 +58,86 @@
     }
 
     @Override
+    public boolean hasNext() throws HyracksDataException {
+        checkPriorityQueue();
+        PriorityQueueElement pqHead = outputPriorityQueue.peek();
+        if (pqHead == null) {
+            // pq is empty
+            return false;
+        }
+
+        if (searchCallback.proceed(pqHead.getTuple())) {
+            // if proceed is successful, then there's no need for doing the "unlatch dance"
+            return true;
+        }
+
+        if (includeMemComponent) {
+            PriorityQueueElement inMemElement = null;
+            boolean inMemElementFound = false;
+
+            // scan the PQ for the in-memory component's element
+            if (outputElement != null && outputElement.getCursorIndex() == 0) {
+                inMemElement = outputElement;
+            } else {
+                Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
+                while (it.hasNext()) {
+                    inMemElement = it.next();
+                    if (inMemElement.getCursorIndex() == 0) {
+                        inMemElementFound = true;
+                        outputPriorityQueue.remove(inMemElement);
+                        break;
+                    }
+                }
+            }
+
+            if (!inMemElementFound && inMemElement != null) {
+                searchCallback.reconcile(pqHead.getTuple());
+                return true;
+            }
+
+            // copy the in-mem tuple
+            ITupleReference inMemTuple = TupleUtils.copyTuple(inMemElement.getTuple());
+            // unlatch
+            rangeCursors[0].reset();
+            // reconcile
+            if (pqHead.getCursorIndex() == 0) {
+                searchCallback.reconcile(inMemTuple);
+            } else {
+                searchCallback.reconcile(pqHead.getTuple());
+            }
+
+            reusablePred.setLowKey(inMemTuple, true);
+            reusablePred.setLowKeyComparator(cmp);
+            reusablePred.setHighKey(predicate.getHighKey(), predicate.isHighKeyInclusive());
+            reusablePred.setHighKeyComparator(predicate.getHighKeyComparator());
+            try {
+                memBTreeAccessor.search(rangeCursors[0], reusablePred);
+            } catch (IndexException e) {
+                throw new HyracksDataException(e);
+            }
+
+            // todo: make lsmbtreetuplereference copy
+            if (rangeCursors[0].hasNext()) {
+                rangeCursors[0].next();
+                inMemElement.reset(rangeCursors[0].getTuple(), 0);
+                if (inMemElementFound) {
+                    outputPriorityQueue.offer(inMemElement);
+                }
+            } else {
+                rangeCursors[0].close();
+            }
+        } else {
+            searchCallback.reconcile(pqHead.getTuple());
+        }
+
+        return true;
+        //        return !outputPriorityQueue.isEmpty();
+    }
+
+    @Override
     public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+        super.open(initialState, searchPred);
         LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
-        searchCallback = lsmInitialState.getSearchOperationCallback();
         cmp = lsmInitialState.getCmp();
         int numBTrees = lsmInitialState.getNumBTrees();
         rangeCursors = new BTreeRangeSearchCursor[numBTrees];
@@ -65,6 +148,8 @@
         includeMemComponent = lsmInitialState.getIncludeMemComponent();
         searcherRefCount = lsmInitialState.getSearcherRefCount();
         lsmHarness = lsmInitialState.getLSMHarness();
+        memBTreeAccessor = lsmInitialState.getMemBTreeAccessor();
+        predicate = (RangePredicate) lsmInitialState.getSearchPredicate();
         setPriorityQueueComparator();
     }
 
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 66ea185..6e22d53 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -185,6 +185,13 @@
             return;
         }
 
+        // TODO: Move this to just before the merge cleanup and remove latching on disk components
+        // The implementation of this call must take any necessary steps to make
+        // the new component permanent, and mark it as valid (usually this means
+        // forcing all pages of the tree to disk, possibly with some extra
+        // information to mark the tree as valid).
+        lsmIndex.getComponentFinalizer().finalize(newComponent);
+
         // Remove the old Trees from the list, and add the new merged Tree(s).
         // Also, swap the searchRefCount.
         synchronized (diskComponentsSync) {
@@ -210,12 +217,6 @@
             }
         }
 
-        // The implementation of this call must take any necessary steps to make
-        // the new component permanent, and mark it as valid (usually this means
-        // forcing all pages of the tree to disk, possibly with some extra
-        // information to mark the tree as valid).
-        lsmIndex.getComponentFinalizer().finalize(newComponent);
-
         // Cleanup. At this point we have guaranteed that no searchers are
         // touching the old on-disk Trees (localSearcherRefCount == 0).
         lsmIndex.cleanUpAfterMerge(mergedComponents);
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
index f622d87..70f1c0f 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
@@ -20,6 +20,9 @@
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
@@ -37,6 +40,7 @@
     protected boolean includeMemComponent;
     protected AtomicInteger searcherRefCount;
     protected LSMHarness lsmHarness;
+    protected ISearchOperationCallback searchCallback;
 
     public LSMTreeSearchCursor() {
         outputElement = null;
@@ -133,8 +137,7 @@
         while (!outputPriorityQueue.isEmpty() || needPush == true) {
             if (!outputPriorityQueue.isEmpty()) {
                 PriorityQueueElement checkElement = outputPriorityQueue.peek();
-                // If there is no previous tuple or the previous tuple can be
-                // ignored
+                // If there is no previous tuple or the previous tuple can be ignored
                 if (outputElement == null) {
                     // Test the tuple is a delete tuple or not
                     if (((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter() == true) {
@@ -160,8 +163,7 @@
                         // int treeNum = reusedElement.getTreeNum();
                         pushIntoPriorityQueue(reusedElement.getCursorIndex());
                     } else {
-                        // If the previous tuple and the head tuple are
-                        // different
+                        // If the previous tuple and the head tuple are different
                         // the info of previous tuple is useless
                         if (needPush == true) {
                             reusedElement = outputElement;
@@ -207,4 +209,9 @@
             this.cursorIndex = cursorIndex;
         }
     }
+
+    @Override
+    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+        searchCallback = initialState.getSearchOperationCallback();
+    }
 }