Fixed the issue of not locking in-memory anti-mattered tuples in the LSM_BTree range cursor which may result in reading un-committed data.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@2931 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
index d1aa6a8..f3a5c82 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
@@ -75,6 +75,7 @@
                 if (reconciled || searchCallback.proceed(predicate.getLowKey())) {
                     // if proceed is successful, then there's no need for doing the "unlatch dance"
                     if (((ILSMTreeTupleReference) rangeCursors[i].getTuple()).isAntimatter()) {
+                        searchCallback.cancel(predicate.getLowKey());
                         return false;
                     } else {
                         frameTuple = rangeCursors[i].getTuple();
@@ -97,6 +98,7 @@
                     if (rangeCursors[i].hasNext()) {
                         rangeCursors[i].next();
                         if (((ILSMTreeTupleReference) rangeCursors[i].getTuple()).isAntimatter()) {
+                            searchCallback.cancel(predicate.getLowKey());
                             return false;
                         } else {
                             frameTuple = rangeCursors[i].getTuple();
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index ff01ce8..1d0310e 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -46,6 +46,7 @@
     private RangePredicate predicate;
     private IIndexAccessor memBTreeAccessor;
     private ArrayTupleBuilder tupleBuilder;
+    private boolean proceed = true;
 
     public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
         super(opCtx);
@@ -54,82 +55,116 @@
     }
 
     @Override
-    public boolean hasNext() throws HyracksDataException, IndexException {
-        checkPriorityQueue();
-        PriorityQueueElement pqHead = outputPriorityQueue.peek();
-        if (pqHead == null) {
-            // PQ is empty
-            return false;
-        }
+    public void reset() throws HyracksDataException, IndexException {
+        super.reset();
+        proceed = true;
+    }
 
-        assert outputElement == null;
+    @Override
+    public void next() throws HyracksDataException {
+        outputElement = outputPriorityQueue.poll();
+        needPush = true;
+        proceed = false;
+    }
 
-        if (searchCallback.proceed(pqHead.getTuple())) {
-            // if proceed is successful, then there's no need for doing the "unlatch dance"
-            return true;
-        }
+    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+        while (!outputPriorityQueue.isEmpty() || needPush == true) {
+            if (!outputPriorityQueue.isEmpty()) {
+                PriorityQueueElement checkElement = outputPriorityQueue.peek();
+                if (proceed && !searchCallback.proceed(checkElement.getTuple())) {
+                    if (includeMemComponent) {
+                        PriorityQueueElement inMemElement = null;
+                        boolean inMemElementFound = false;
+                        // scan the PQ for the in-memory component's element
+                        Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
+                        while (it.hasNext()) {
+                            inMemElement = it.next();
+                            if (inMemElement.getCursorIndex() == 0) {
+                                inMemElementFound = true;
+                                it.remove();
+                                break;
+                            }
+                        }
+                        if (inMemElementFound) {
+                            // copy the in-mem tuple
+                            if (tupleBuilder == null) {
+                                tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
+                            }
+                            TupleUtils.copyTuple(tupleBuilder, inMemElement.getTuple(), cmp.getKeyFieldCount());
+                            copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
 
-        if (includeMemComponent) {
-            PriorityQueueElement inMemElement = null;
-            boolean inMemElementFound = false;
+                            // unlatch/unpin
+                            rangeCursors[0].reset();
 
-            // scan the PQ for the in-memory component's element
-            Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
-            while (it.hasNext()) {
-                inMemElement = it.next();
-                if (inMemElement.getCursorIndex() == 0) {
-                    inMemElementFound = true;
-                    it.remove();
-                    break;
+                            // reconcile
+                            if (checkElement.getCursorIndex() == 0) {
+                                searchCallback.reconcile(copyTuple);
+                            } else {
+                                searchCallback.reconcile(checkElement.getTuple());
+                            }
+                            // retraverse
+                            reusablePred.setLowKey(copyTuple, true);
+                            try {
+                                memBTreeAccessor.search(rangeCursors[0], reusablePred);
+                            } catch (IndexException e) {
+                                throw new HyracksDataException(e);
+                            }
+                            pushIntoPriorityQueue(inMemElement);
+                            if (cmp.compare(copyTuple, inMemElement.getTuple()) != 0) {
+                                searchCallback.cancel(copyTuple);
+                                continue;
+                            }
+                        } else {
+                            // the in-memory cursor is exhausted
+                            searchCallback.reconcile(checkElement.getTuple());
+                        }
+                    } else {
+                        searchCallback.reconcile(checkElement.getTuple());
+                    }
                 }
-            }
+                // If there is no previous tuple or the previous tuple can be ignored
+                if (outputElement == null) {
+                    if (isDeleted(checkElement)) {
+                        // If the key has been deleted then pop it and set needPush to true.
+                        // We cannot push immediately because the tuple may be
+                        // modified if hasNext() is called
+                        outputElement = outputPriorityQueue.poll();
+                        searchCallback.cancel(checkElement.getTuple());
+                        needPush = true;
+                        proceed = false;
+                    } else {
+                        break;
+                    }
+                } else {
+                    // Compare the previous tuple and the head tuple in the PQ
+                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+                        // If the previous tuple and the head tuple are
+                        // identical
+                        // then pop the head tuple and push the next tuple from
+                        // the tree of head tuple
 
-            if (!inMemElementFound) {
-                // the in-memory cursor is exhausted
-                searchCallback.reconcile(pqHead.getTuple());
-                return true;
-            }
-
-            // copy the in-mem tuple
-            if (tupleBuilder == null) {
-                tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
-            }
-            TupleUtils.copyTuple(tupleBuilder, inMemElement.getTuple(), cmp.getKeyFieldCount());
-            copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
-
-            // unlatch/unpin
-            rangeCursors[0].reset();
-
-            // reconcile
-            if (pqHead.getCursorIndex() == 0) {
-                searchCallback.reconcile(copyTuple);
+                        // the head element of PQ is useless now
+                        PriorityQueueElement e = outputPriorityQueue.poll();
+                        pushIntoPriorityQueue(e);
+                    } else {
+                        // If the previous tuple and the head tuple are different
+                        // the info of previous tuple is useless
+                        if (needPush == true) {
+                            pushIntoPriorityQueue(outputElement);
+                            needPush = false;
+                        }
+                        proceed = true;
+                        outputElement = null;
+                    }
+                }
             } else {
-                searchCallback.reconcile(pqHead.getTuple());
+                // the priority queue is empty and needPush
+                pushIntoPriorityQueue(outputElement);
+                needPush = false;
+                outputElement = null;
+                proceed = true;
             }
-
-            // retraverse
-            reusablePred.setLowKey(copyTuple, true);
-            try {
-                memBTreeAccessor.search(rangeCursors[0], reusablePred);
-            } catch (IndexException e) {
-                throw new HyracksDataException(e);
-            }
-
-            if (!pushIntoPriorityQueue(inMemElement)) {
-                return !outputPriorityQueue.isEmpty();
-            }
-
-            if (pqHead.getCursorIndex() == 0) {
-                if (cmp.compare(copyTuple, inMemElement.getTuple()) != 0) {
-                    searchCallback.cancel(copyTuple);
-                }
-            }
-            checkPriorityQueue();
-        } else {
-            searchCallback.reconcile(pqHead.getTuple());
         }
-
-        return true;
     }
 
     @Override
@@ -178,5 +213,6 @@
             diskBTreeIx++;
         }
         initPriorityQueue();
+        proceed = true;
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
index 82da312..6872520 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
@@ -92,6 +92,7 @@
         if (currentCursor != null) {
             currentCursor.close();
         }
+        currentCursor = null;
     }
 
     @Override
@@ -99,6 +100,7 @@
         if (currentCursor != null) {
             currentCursor.reset();
         }
+        currentCursor = null;
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 92d3679..9fc351a 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -151,50 +151,7 @@
         return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter();
     }
 
-    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
-        while (!outputPriorityQueue.isEmpty() || needPush == true) {
-            if (!outputPriorityQueue.isEmpty()) {
-                PriorityQueueElement checkElement = outputPriorityQueue.peek();
-                // If there is no previous tuple or the previous tuple can be ignored
-                if (outputElement == null) {
-                    if (isDeleted(checkElement)) {
-                        // If the key has been deleted then pop it and set needPush to true.
-                        // We cannot push immediately because the tuple may be
-                        // modified if hasNext() is called
-                        outputElement = outputPriorityQueue.poll();
-                        needPush = true;
-                    } else {
-                        break;
-                    }
-                } else {
-                    // Compare the previous tuple and the head tuple in the PQ
-                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
-                        // If the previous tuple and the head tuple are
-                        // identical
-                        // then pop the head tuple and push the next tuple from
-                        // the tree of head tuple
-
-                        // the head element of PQ is useless now
-                        PriorityQueueElement e = outputPriorityQueue.poll();
-                        pushIntoPriorityQueue(e);
-                    } else {
-                        // If the previous tuple and the head tuple are different
-                        // the info of previous tuple is useless
-                        if (needPush == true) {
-                            pushIntoPriorityQueue(outputElement);
-                            needPush = false;
-                        }
-                        outputElement = null;
-                    }
-                }
-            } else {
-                // the priority queue is empty and needPush
-                pushIntoPriorityQueue(outputElement);
-                needPush = false;
-                outputElement = null;
-            }
-        }
-    }
+    abstract protected void checkPriorityQueue() throws HyracksDataException, IndexException;
 
     @Override
     public boolean exclusiveLatchNodes() {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index 259af5b..1b5949a 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -113,4 +113,50 @@
         }
         return false;
     }
+    
+    @Override
+    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+        while (!outputPriorityQueue.isEmpty() || needPush == true) {
+            if (!outputPriorityQueue.isEmpty()) {
+                PriorityQueueElement checkElement = outputPriorityQueue.peek();
+                // If there is no previous tuple or the previous tuple can be ignored
+                if (outputElement == null) {
+                    if (isDeleted(checkElement)) {
+                        // If the key has been deleted then pop it and set needPush to true.
+                        // We cannot push immediately because the tuple may be
+                        // modified if hasNext() is called
+                        outputElement = outputPriorityQueue.poll();
+                        needPush = true;
+                    } else {
+                        break;
+                    }
+                } else {
+                    // Compare the previous tuple and the head tuple in the PQ
+                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+                        // If the previous tuple and the head tuple are
+                        // identical
+                        // then pop the head tuple and push the next tuple from
+                        // the tree of head tuple
+
+                        // the head element of PQ is useless now
+                        PriorityQueueElement e = outputPriorityQueue.poll();
+                        pushIntoPriorityQueue(e);
+                    } else {
+                        // If the previous tuple and the head tuple are different
+                        // the info of previous tuple is useless
+                        if (needPush == true) {
+                            pushIntoPriorityQueue(outputElement);
+                            needPush = false;
+                        }
+                        outputElement = null;
+                    }
+                }
+            } else {
+                // the priority queue is empty and needPush
+                pushIntoPriorityQueue(outputElement);
+                needPush = false;
+                outputElement = null;
+            }
+        }
+    }
 }
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index f9dd819..8632150 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -212,4 +212,49 @@
         }
     }
 
+    @Override
+    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+        while (!outputPriorityQueue.isEmpty() || needPush == true) {
+            if (!outputPriorityQueue.isEmpty()) {
+                PriorityQueueElement checkElement = outputPriorityQueue.peek();
+                // If there is no previous tuple or the previous tuple can be ignored
+                if (outputElement == null) {
+                    if (isDeleted(checkElement)) {
+                        // If the key has been deleted then pop it and set needPush to true.
+                        // We cannot push immediately because the tuple may be
+                        // modified if hasNext() is called
+                        outputElement = outputPriorityQueue.poll();
+                        needPush = true;
+                    } else {
+                        break;
+                    }
+                } else {
+                    // Compare the previous tuple and the head tuple in the PQ
+                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+                        // If the previous tuple and the head tuple are
+                        // identical
+                        // then pop the head tuple and push the next tuple from
+                        // the tree of head tuple
+
+                        // the head element of PQ is useless now
+                        PriorityQueueElement e = outputPriorityQueue.poll();
+                        pushIntoPriorityQueue(e);
+                    } else {
+                        // If the previous tuple and the head tuple are different
+                        // the info of previous tuple is useless
+                        if (needPush == true) {
+                            pushIntoPriorityQueue(outputElement);
+                            needPush = false;
+                        }
+                        outputElement = null;
+                    }
+                }
+            } else {
+                // the priority queue is empty and needPush
+                pushIntoPriorityQueue(outputElement);
+                needPush = false;
+                outputElement = null;
+            }
+        }
+    }
 }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index f8ad0b2..9341f77 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -140,13 +140,11 @@
 
             @Override
             public boolean proceed(ITupleReference tuple) {
-                Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
                 return false;
             }
 
             @Override
             public void reconcile(ITupleReference tuple) {
-                Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
                 if (blockOnHigh) {
                     try {
                         TupleUtils.createIntegerTuple(builder, SearchTask.this.tuple, expectedAfterBlock);