Implemented lsm inverted index range search cursor that is used in merges.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_inverted_index_updates_new@1847 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexCursor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexCursor.java
index 0a53cf6..5a23fc6 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexCursor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexCursor.java
@@ -19,8 +19,8 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public interface IIndexCursor {
-    public void open(ICursorInitialState initialState,
-            ISearchPredicate searchPred) throws HyracksDataException;      
+    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws IndexException,
+            HyracksDataException;
 
     public boolean hasNext() throws HyracksDataException;
 
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 832f88e..a734ea4 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -15,14 +15,11 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
 
-import java.util.Comparator;
 import java.util.Iterator;
-import java.util.PriorityQueue;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
@@ -32,7 +29,6 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeSearchCursor;
 
 public class LSMBTreeRangeSearchCursor extends LSMTreeSearchCursor {
@@ -40,7 +36,6 @@
     private final RangePredicate reusablePred;
 
     private ISearchOperationCallback searchCallback;
-    private PriorityQueueComparator pqCmp;
     private RangePredicate predicate;
     private IIndexAccessor memBTreeAccessor;
     private ArrayTupleBuilder tupleBuilder;
@@ -51,15 +46,6 @@
         this.reusablePred = new RangePredicate(null, null, true, true, null, null);
     }
 
-    public void initPriorityQueue() throws HyracksDataException {
-        int pqInitSize = (rangeCursors.length > 0) ? rangeCursors.length : 1;
-        outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(pqInitSize, pqCmp);
-        for (int i = 0; i < rangeCursors.length; i++) {
-            pushIntoPriorityQueue(new PriorityQueueElement(i));
-        }
-        checkPriorityQueue();
-    }
-
     @Override
     public boolean hasNext() throws HyracksDataException {
         checkPriorityQueue();
@@ -154,42 +140,4 @@
         reusablePred.setHighKeyComparator(predicate.getHighKeyComparator());
         setPriorityQueueComparator();
     }
-
-    @Override
-    protected void setPriorityQueueComparator() {
-        if (pqCmp == null || cmp != pqCmp.getMultiComparator()) {
-            pqCmp = new PriorityQueueComparator(cmp);
-        }
-    }
-
-    public class PriorityQueueComparator implements Comparator<PriorityQueueElement> {
-
-        private final MultiComparator cmp;
-
-        public PriorityQueueComparator(MultiComparator cmp) {
-            this.cmp = cmp;
-        }
-
-        @Override
-        public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
-            int result = cmp.compare(elementA.getTuple(), elementB.getTuple());
-            if (result != 0) {
-                return result;
-            }
-            if (elementA.getCursorIndex() > elementB.getCursorIndex()) {
-                return 1;
-            } else {
-                return -1;
-            }
-        }
-
-        public MultiComparator getMultiComparator() {
-            return cmp;
-        }
-    }
-
-    @Override
-    protected int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB) {
-        return cmp.compare(tupleA, tupleB);
-    }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
index 5d0a420..d2a757d 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
@@ -15,11 +15,13 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import java.util.Comparator;
 import java.util.PriorityQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
@@ -28,8 +30,9 @@
 
 public abstract class LSMTreeSearchCursor implements ITreeIndexCursor {
     protected PriorityQueueElement outputElement;
-    protected ITreeIndexCursor[] rangeCursors;
+    protected IIndexCursor[] rangeCursors;
     protected PriorityQueue<PriorityQueueElement> outputPriorityQueue;
+    protected PriorityQueueComparator pqCmp;
     protected MultiComparator cmp;
     protected boolean needPush;
     protected boolean includeMemComponent;
@@ -41,7 +44,16 @@
         needPush = false;
     }
 
-    public ITreeIndexCursor getCursor(int cursorIndex) {
+    public void initPriorityQueue() throws HyracksDataException {
+        int pqInitSize = (rangeCursors.length > 0) ? rangeCursors.length : 1;
+        outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(pqInitSize, pqCmp);
+        for (int i = 0; i < rangeCursors.length; i++) {
+            pushIntoPriorityQueue(new PriorityQueueElement(i));
+        }
+        checkPriorityQueue();
+    }
+
+    public IIndexCursor getCursor(int cursorIndex) {
         return rangeCursors[cursorIndex];
     }
 
@@ -81,8 +93,6 @@
         }
     }
 
-    protected abstract void setPriorityQueueComparator();
-
     @Override
     public ICachedPage getPage() {
         // do nothing
@@ -129,8 +139,6 @@
         return false;
     }
 
-    protected abstract int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB);
-
     protected void checkPriorityQueue() throws HyracksDataException {
         while (!outputPriorityQueue.isEmpty() || needPush == true) {
             if (!outputPriorityQueue.isEmpty()) {
@@ -204,4 +212,40 @@
             this.tuple = tuple;
         }
     }
+
+    public class PriorityQueueComparator implements Comparator<PriorityQueueElement> {
+
+        protected final MultiComparator cmp;
+
+        public PriorityQueueComparator(MultiComparator cmp) {
+            this.cmp = cmp;
+        }
+
+        @Override
+        public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
+            int result = cmp.compare(elementA.getTuple(), elementB.getTuple());
+            if (result != 0) {
+                return result;
+            }
+            if (elementA.getCursorIndex() > elementB.getCursorIndex()) {
+                return 1;
+            } else {
+                return -1;
+            }
+        }
+
+        public MultiComparator getMultiComparator() {
+            return cmp;
+        }
+    }
+
+    protected void setPriorityQueueComparator() {
+        if (pqCmp == null || cmp != pqCmp.getMultiComparator()) {
+            pqCmp = new PriorityQueueComparator(cmp);
+        }
+    }
+
+    protected int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB) {
+        return cmp.compare(tupleA, tupleB);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index fb943e7..db43255 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
@@ -43,6 +44,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
@@ -64,6 +66,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.InvertedIndexUtils;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
@@ -327,11 +330,27 @@
                     NoOpOperationCallback.INSTANCE);
             indexAccessors.add(accessor);
         }
-        LSMInvertedIndexCursorInitialState initState = new LSMInvertedIndexCursorInitialState(indexAccessors, ictx,
-                includeMemComponent, searcherRefCount, lsmHarness);
+        ICursorInitialState initState = createCursorInitialState(pred, ictx, includeMemComponent, searcherRefCount, indexAccessors);
         cursor.open(initState, pred);
     }
 
+    private ICursorInitialState createCursorInitialState(ISearchPredicate pred, IIndexOpContext ictx,
+            boolean includeMemComponent, AtomicInteger searcherRefCount, ArrayList<IIndexAccessor> indexAccessors) {
+        // TODO: This check is not pretty, but it does the job. Come up with something more OO in the future.
+        ICursorInitialState initState = null;
+        // Distinguish between regular searches and range searches (mostly used in merges).
+        if (pred instanceof InvertedIndexSearchPredicate) {
+            initState = new LSMInvertedIndexCursorInitialState(indexAccessors, ictx, includeMemComponent,
+                    searcherRefCount, lsmHarness);
+        } else {
+            InMemoryInvertedIndex memInvIndex = (InMemoryInvertedIndex) memComponent.getInvIndex();
+            MultiComparator cmp = MultiComparator.create(memInvIndex.getBTree().getComparatorFactories());
+            initState = new LSMInvertedIndexRangeSearchCursorInitialState(cmp, searcherRefCount, lsmHarness,
+                    indexAccessors, pred);
+        }
+        return initState;
+    }
+    
     @Override
     // TODO: Deal with deletions properly.
     public Object merge(List<Object> mergedComponents, ILSMIOOperation operation) throws HyracksDataException,
@@ -446,6 +465,7 @@
         return new LSMInvertedIndexComponent(diskInvertedIndex, diskDeletedKeysBTree);
     }
 
+    @Override
     public void addFlushedComponent(Object index) {
         diskComponents.addFirst(index);
     }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index c7d6075..050fb02 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -101,9 +101,7 @@
     @Override
     public void rangeSearch(IIndexCursor cursor, ISearchPredicate searchPred) throws IndexException,
             HyracksDataException {
-        // TODO: Figure out what the initial state is here.
-        //LSMInvertedIndexRangeSearchCursor rangeSearchCursor = (LSMInvertedIndexRangeSearchCursor) cursor;
-        //rangeSearchCursor.open(initialState, searchPred);
+        search(cursor, searchPred);
     }
     
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index a3e7bf1..4f13138 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -15,251 +15,70 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
 
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
 import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex.OnDiskInvertedIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
 
-public class LSMInvertedIndexRangeSearchCursor implements IIndexCursor {
-
-    private LSMHarness harness;
-    private boolean includeMemComponent;
-    private AtomicInteger searcherRefCount;
-    private List<IIndexAccessor> indexAccessors;
-    private List<IIndexCursor> indexCursors;
-    private boolean flagEOF = false;
-    private boolean flagFirstNextCall = true;
-
-    private PriorityQueue<PriorityQueueElement> outputPriorityQueue;
-    private MultiComparator memoryInvertedIndexComparator;
-    private PriorityQueueComparator pqCmp;
-    private int tokenFieldCount;
-    private int invListFieldCount;
-    private ITupleReference resultTuple;
-    private BitSet closedCursors;
+public class LSMInvertedIndexRangeSearchCursor extends LSMTreeSearchCursor {
 
     @Override
-    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
-        LSMInvertedIndexCursorInitialState lsmInitialState = (LSMInvertedIndexCursorInitialState) initialState;
-        this.harness = lsmInitialState.getLSMHarness();
-        this.includeMemComponent = lsmInitialState.getIncludeMemComponent();
-        this.searcherRefCount = lsmInitialState.getSearcherRefCount();
-        this.indexAccessors = lsmInitialState.getIndexAccessors();
-        this.indexCursors = new ArrayList<IIndexCursor>(indexAccessors.size());
-        LSMInvertedIndexOpContext opContext = (LSMInvertedIndexOpContext) lsmInitialState.getOpContext();
-        this.memoryInvertedIndexComparator = opContext.getComparator();
-        this.tokenFieldCount = opContext.getTokenFieldCount();
-        this.invListFieldCount = opContext.getInvListFieldCount();
-        closedCursors = new BitSet(indexAccessors.size());
+    public void open(ICursorInitialState initState, ISearchPredicate searchPred) throws IndexException,
+            HyracksDataException {
+        LSMInvertedIndexRangeSearchCursorInitialState lsmInitState = (LSMInvertedIndexRangeSearchCursorInitialState) initState;
+        cmp = lsmInitState.getOriginalKeyComparator();
+        int numComponents = lsmInitState.getNumComponents();
+        rangeCursors = new IIndexCursor[numComponents];
+        for (int i = 0; i < numComponents; i++) {
+            IInvertedIndexAccessor invIndexAccessor = (IInvertedIndexAccessor) lsmInitState.getIndexAccessors().get(i);
+            rangeCursors[i] = invIndexAccessor.createRangeSearchCursor();
+            invIndexAccessor.rangeSearch(rangeCursors[i], lsmInitState.getSearchPredicate());
 
-        //create all cursors
-        IIndexCursor cursor;
-        for (IIndexAccessor accessor : indexAccessors) {
-            OnDiskInvertedIndexAccessor invIndexAccessor = (OnDiskInvertedIndexAccessor) accessor;
-            cursor = invIndexAccessor.createRangeSearchCursor();
-            try {
-                invIndexAccessor.rangeSearch(cursor, searchPred);
-            } catch (IndexException e) {
-                throw new HyracksDataException(e);
-            }
-            indexCursors.add(cursor);
         }
+        searcherRefCount = lsmInitState.getSearcherRefCount();
+        lsmHarness = lsmInitState.getLSMHarness();
+        setPriorityQueueComparator();
+        initPriorityQueue();
     }
+    
+    protected void checkPriorityQueue() throws HyracksDataException {
+        while (!outputPriorityQueue.isEmpty() || needPush == true) {
+            if (!outputPriorityQueue.isEmpty()) {
+                PriorityQueueElement checkElement = outputPriorityQueue.peek();
+                // If there is no previous tuple or the previous tuple can be ignored
+                if (outputElement == null) {
+                    // TODO: This is the place where we need to check for deleted keys. Have a look at the super class to find out more.
+                    break;
+                } else {
+                    // Compare the previous tuple and the head tuple in the PQ
+                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+                        // If the previous tuple and the head tuple are
+                        // identical
+                        // then pop the head tuple and push the next tuple from
+                        // the tree of head tuple
 
-    @Override
-    public boolean hasNext() throws HyracksDataException {
-
-        if (flagEOF) {
-            return false;
-        }
-
-        if (flagFirstNextCall) {
-            for (IIndexCursor c : indexCursors) {
-                if (c.hasNext()) {
-                    return true;
-                }
-            }
-        } else {
-            if (outputPriorityQueue.size() > 0) {
-                return true;
-            }
-        }
-
-        flagEOF = true;
-        return false;
-    }
-
-    @Override
-    public void next() throws HyracksDataException {
-
-        PriorityQueueElement pqElement;
-        IIndexCursor cursor;
-        int cursorIndex;
-
-        if (flagEOF) {
-            return;
-        }
-
-        //When the next() is called for the first time, initialize priority queue.
-        if (flagFirstNextCall) {
-            flagFirstNextCall = false;
-
-            //create and initialize PriorityQueue
-            pqCmp = new PriorityQueueComparator(memoryInvertedIndexComparator);
-            outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(indexCursors.size(), pqCmp);
-
-            //read the first tuple from each cursor and insert into outputPriorityQueue
-
-            for (int i = 0; i < indexCursors.size(); i++) {
-                cursor = indexCursors.get(i);
-                if (cursor.hasNext()) {
-                    cursor.next();
-                    pqElement = new PriorityQueueElement(cursor.getTuple(), i);
-                    outputPriorityQueue.offer(pqElement);
-                }
-                //else {
-                //    //do nothing for the cursor who reached EOF.
-                //}
-            }
-        }
-
-        //If you reach here, priority queue is set up to provide the smallest <tokenFields, invListFields>
-        //Get the smallest element from priority queue. 
-        //This element will be the result tuple which will be served to the caller when getTuple() is called. 
-        //Then, insert new element from the cursor where the smallest element came from.
-        pqElement = outputPriorityQueue.poll();
-        if (pqElement != null) {
-            resultTuple = pqElement.getTuple();
-            cursorIndex = pqElement.getCursorIndex();
-            cursor = indexCursors.get(cursorIndex);
-            if (cursor.hasNext()) {
-                cursor.next();
-                pqElement = new PriorityQueueElement(cursor.getTuple(), cursorIndex);
-                outputPriorityQueue.offer(pqElement);
-            } else {
-                cursor.close();
-                closedCursors.set(cursorIndex, true);
-
-//                 If the current cursor reached EOF, read a tuple from another cursor and insert into the priority queue.
-                for (int i = 0; i < indexCursors.size(); i++) {
-                    if (closedCursors.get(i))
-                        continue;
-
-                    cursor = indexCursors.get(i);
-                    if (cursor.hasNext()) {
-                        cursor.next();
-                        pqElement = new PriorityQueueElement(cursor.getTuple(), i);
-                        outputPriorityQueue.offer(pqElement);
-                        break;
+                        // the head element of PQ is useless now
+                        PriorityQueueElement e = outputPriorityQueue.poll();
+                        pushIntoPriorityQueue(e);
                     } else {
-                        cursor.close();
-                        closedCursors.set(i, true);
+                        // If the previous tuple and the head tuple are different
+                        // the info of previous tuple is useless
+                        if (needPush == true) {
+                            pushIntoPriorityQueue(outputElement);
+                            needPush = false;
+                        }
+                        outputElement = null;
                     }
                 }
-                //if (i == indexCursors.size()) {
-                //    all cursors reached EOF and the only tuples that you have are in the priority queue.
-                //    do nothing here!.
-                //}
-            }
-        }
-        //else {
-        // do nothing!!
-        // which means when the getTuple() is called, the pre-existing result tuple or null will be returned to the caller.
-        //}
-
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            for (int i = 0; i < indexCursors.size(); i++) {
-                if (closedCursors.get(i)) {
-                    continue;
-                }
-                indexCursors.get(i).close();
-                closedCursors.set(i, true);
-            }
-        } finally {
-            harness.closeSearchCursor(searcherRefCount, includeMemComponent);
-        }
-    }
-
-    @Override
-    public void reset() {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public ITupleReference getTuple() {
-        return resultTuple;
-    }
-
-    public class PriorityQueueComparator implements Comparator<PriorityQueueElement> {
-
-        private final MultiComparator cmp;
-
-        public PriorityQueueComparator(MultiComparator cmp) {
-            this.cmp = cmp;
-        }
-
-        @Override
-        public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
-            int result = cmp.compare(elementA.getTuple(), elementB.getTuple());
-            if (result != 0) {
-                return result;
-            }
-            if (elementA.getCursorIndex() > elementB.getCursorIndex()) {
-                return 1;
             } else {
-                return -1;
+                // the priority queue is empty and needPush
+                pushIntoPriorityQueue(outputElement);
+                needPush = false;
+                outputElement = null;
             }
         }
-
-        public MultiComparator getMultiComparator() {
-            return cmp;
-        }
     }
-
-    public class PriorityQueueElement {
-        private ITupleReference tuple;
-        private int cursorIndex;
-
-        public PriorityQueueElement(ITupleReference tuple, int cursorIndex) {
-//            reset(tuple, cursorIndex);
-            try {
-                reset(TupleUtils.copyTuple(tuple), cursorIndex);
-            } catch (HyracksDataException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
-        }
-
-        public ITupleReference getTuple() {
-            return tuple;
-        }
-
-        public int getCursorIndex() {
-            return cursorIndex;
-        }
-
-        public void reset(ITupleReference tuple, int cursorIndex) {
-            this.tuple = tuple;
-            this.cursorIndex = cursorIndex;
-        }
-    }
-
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursorInitialState.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursorInitialState.java
new file mode 100644
index 0000000..645658e
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursorInitialState.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public class LSMInvertedIndexRangeSearchCursorInitialState implements ICursorInitialState {
+
+    private final MultiComparator cmp;
+    private final AtomicInteger searcherRefCount;
+    private final LSMHarness lsmHarness;
+
+    private final ArrayList<IIndexAccessor> indexAccessors;
+    private final ISearchPredicate predicate;
+
+    public LSMInvertedIndexRangeSearchCursorInitialState(MultiComparator cmp, AtomicInteger searcherRefCount,
+            LSMHarness lsmHarness, ArrayList<IIndexAccessor> indexAccessors, ISearchPredicate predicate) {
+        this.cmp = cmp;
+        this.searcherRefCount = searcherRefCount;
+        this.lsmHarness = lsmHarness;
+        this.indexAccessors = indexAccessors;
+        this.predicate = predicate;
+    }
+
+    public int getNumComponents() {
+        return indexAccessors.size();
+    }
+
+    @Override
+    public ICachedPage getPage() {
+        return null;
+    }
+
+    @Override
+    public void setPage(ICachedPage page) {
+    }
+
+    public AtomicInteger getSearcherRefCount() {
+        return searcherRefCount;
+    }
+
+    public LSMHarness getLSMHarness() {
+        return lsmHarness;
+    }
+
+    @Override
+    public ISearchOperationCallback getSearchOperationCallback() {
+        return null;
+    }
+
+    @Override
+    public void setSearchOperationCallback(ISearchOperationCallback searchCallback) {
+        // Do nothing.
+    }
+
+    public ArrayList<IIndexAccessor> getIndexAccessors() {
+        return indexAccessors;
+    }
+
+    public ISearchPredicate getSearchPredicate() {
+        return predicate;
+    }
+
+    @Override
+    public MultiComparator getOriginalKeyComparator() {
+        return cmp;
+    }
+
+    @Override
+    public void setOriginialKeyComparator(MultiComparator originalCmp) {
+        // Do nothing.
+    }
+}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index 4b34dc9..63ad633 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
 
-import java.util.Comparator;
 import java.util.PriorityQueue;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -39,7 +38,6 @@
     private BTreeRangeSearchCursor memBTreeCursor;
     private RangePredicate btreeRangePredicate;
     private ITreeIndexAccessor memBTreeAccessor;
-    protected PriorityQueueHilbertComparator pqCmp;
     private boolean foundNext;
     private ITupleReference frameTuple;
     private int[] comparatorFields;
@@ -185,13 +183,12 @@
         }
     }
 
-    public class PriorityQueueHilbertComparator implements Comparator<PriorityQueueElement> {
+    public class PriorityQueueHilbertComparator extends PriorityQueueComparator {
 
-        private final MultiComparator cmp;
         private final int[] comparatorFields;
 
         public PriorityQueueHilbertComparator(MultiComparator cmp, int[] comparatorFields) {
-            this.cmp = cmp;
+            super(cmp);
             this.comparatorFields = comparatorFields;
         }
 
@@ -207,10 +204,6 @@
                 return -1;
             }
         }
-
-        public MultiComparator getMultiComparator() {
-            return cmp;
-        }
     }
 
 }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index 71da984..7f2e0b6 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -736,7 +736,7 @@
     }
 
     private void search(ITreeIndexCursor cursor, ISearchPredicate searchPred, RTreeOpContext ctx)
-            throws HyracksDataException, TreeIndexException {
+            throws HyracksDataException, IndexException {
         ctx.reset();
         ctx.cursor = cursor;
 
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexLoadTest.java
index 611e13e..29e6639 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexLoadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexLoadTest.java
@@ -53,10 +53,10 @@
             // Validate index and compare against expected index.
             invIndex.validate();
             if (invIndexType == InvertedIndexType.INMEMORY || invIndexType == InvertedIndexType.ONDISK) {
-                // These two different comparison methods exercise different features of the inverted-indexes.
-                InvertedIndexTestUtils.compareActualAndExpectedIndexes(testCtx);
-                InvertedIndexTestUtils.compareActualAndExpectedIndexesRangeSearch(testCtx);
+                // This comparison method exercises different features of these types of inverted-indexes.
+                InvertedIndexTestUtils.compareActualAndExpectedIndexes(testCtx);                
             }
+            InvertedIndexTestUtils.compareActualAndExpectedIndexesRangeSearch(testCtx);
         }
 
         invIndex.deactivate();