- Added search operation to the LSM-RTree
- Fixed the r-tree in-memory buffer cache to relax the hard memory constraint 

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1043 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsmtree-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/common/freepage/InMemoryBufferCache.java b/hyracks-storage-am-lsmtree-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/common/freepage/InMemoryBufferCache.java
index 100cf79..d667336 100644
--- a/hyracks-storage-am-lsmtree-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/common/freepage/InMemoryBufferCache.java
+++ b/hyracks-storage-am-lsmtree-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/common/freepage/InMemoryBufferCache.java
@@ -121,7 +121,7 @@
 		// Do nothing.
 	}
 
-    private class CachedPage implements ICachedPageInternal {
+    public class CachedPage implements ICachedPageInternal {
         private final int cpid;
         private final ByteBuffer buffer;
         private final ReadWriteLock latch;
diff --git a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMBTreeOpContext.java b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMBTreeOpContext.java
index f35ee34..6c4f98a 100644
--- a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMBTreeOpContext.java
+++ b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMBTreeOpContext.java
@@ -9,15 +9,15 @@
 
 public final class LSMBTreeOpContext extends BTreeOpContext {
 
-    public final BTree.BTreeAccessor memBtreeAccessor;
+    public final BTree.BTreeAccessor memBTreeAccessor;
 
     public LSMBTreeOpContext(BTree.BTreeAccessor memBtreeAccessor, ITreeIndexFrameFactory leafFrameFactory,
             ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexMetaDataFrame metaFrame, MultiComparator cmp) {
         super(leafFrameFactory, interiorFrameFactory, metaFrame, cmp);
 
-        this.memBtreeAccessor = memBtreeAccessor;
+        this.memBTreeAccessor = memBtreeAccessor;
         // Overwrite the BTree accessor's op context with our LSMBTreeOpContext.
-        this.memBtreeAccessor.setOpContext(this);
+        this.memBTreeAccessor.setOpContext(this);
 
         reset(op);
     }
diff --git a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java
index c308473..2c837bf 100644
--- a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java
@@ -7,6 +7,7 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -47,11 +48,13 @@
     private final ITreeIndexFrameFactory btreeInteriorFrameFactory;
     private final ITreeIndexFrameFactory rtreeLeafFrameFactory;
     private final ITreeIndexFrameFactory btreeLeafFrameFactory;
-    private final MultiComparator cmp;
+    private final MultiComparator rtreeCmp;
+    private final MultiComparator btreeCmp;
 
     // TODO: change to private, it's public only for LSMTreeSearchTest
     public LinkedList<ITreeIndex> inDiskRTreeList;
     public LinkedList<ITreeIndex> inDiskBTreeList;
+
     private LinkedList<ITreeIndex> mergedInDiskRTreeList;
     private LinkedList<ITreeIndex> mergedInDiskBTreeList;
     private int inDiskTreeCounter;
@@ -59,15 +62,17 @@
     private final BTreeFactory bTreeFactory;
     private final IFileMapManager fileMapManager;
     private int threadReferenceCounter;
+    private int threadRefCount;
     private boolean flushFlag;
 
-    public LSMRTree(IBufferCache rtreeMemCache, IBufferCache bufferCache, int fieldCount, MultiComparator cmp,
-    		InMemoryFreePageManager memFreePageManager, ITreeIndexFrameFactory rtreeInteriorFrameFactory,
-            ITreeIndexFrameFactory btreeInteriorFrameFactory, ITreeIndexFrameFactory rtreeLeafFrameFactory,
-            ITreeIndexFrameFactory btreeLeafFrameFactory, RTreeFactory rTreeFactory, BTreeFactory bTreeFactory,
-            IFileMapManager fileMapManager) {
+    public LSMRTree(IBufferCache rtreeMemCache, IBufferCache bufferCache, int fieldCount, MultiComparator rtreeCmp,
+            MultiComparator btreeCmp, InMemoryFreePageManager memFreePageManager,
+            ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeInteriorFrameFactory,
+            ITreeIndexFrameFactory rtreeLeafFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
+            RTreeFactory rTreeFactory, BTreeFactory bTreeFactory, IFileMapManager fileMapManager) {
         this.bufferCache = bufferCache;
-        this.cmp = cmp;
+        this.rtreeCmp = rtreeCmp;
+        this.btreeCmp = btreeCmp;
         this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory;
         this.btreeInteriorFrameFactory = btreeInteriorFrameFactory;
         this.rtreeLeafFrameFactory = rtreeLeafFrameFactory;
@@ -80,6 +85,7 @@
         this.inDiskTreeCounter = 0;
         this.fileMapManager = fileMapManager;
         this.threadReferenceCounter = 0;
+        this.threadRefCount = 0;
         this.created = false;
         this.flushFlag = false;
 
@@ -96,9 +102,9 @@
             e.printStackTrace();
         }
 
-        memRTree = new RTree(rtreeMemCache, fieldCount, cmp, memFreePageManager, rtreeInteriorFrameFactory,
+        memRTree = new RTree(rtreeMemCache, fieldCount, rtreeCmp, memFreePageManager, rtreeInteriorFrameFactory,
                 rtreeLeafFrameFactory);
-        memBTree = new BTree(rtreeMemCache, fieldCount, cmp, memFreePageManager, btreeInteriorFrameFactory,
+        memBTree = new BTree(rtreeMemCache, fieldCount, btreeCmp, memFreePageManager, btreeInteriorFrameFactory,
                 btreeLeafFrameFactory);
     }
 
@@ -226,13 +232,11 @@
         // RTree.
         IIndexBulkLoadContext rtreeBulkLoadCtx = inDiskRTree.beginBulkLoad(1.0f);
 
-        int i = 0;
         try {
             while (rtreeScanCursor.hasNext()) {
                 rtreeScanCursor.next();
                 ITupleReference frameTuple = rtreeScanCursor.getTuple();
                 inDiskRTree.bulkLoadAddTuple(frameTuple, rtreeBulkLoadCtx);
-                i++;
             }
         } finally {
             rtreeScanCursor.close();
@@ -296,7 +300,7 @@
         memBTree.create(btreeFileId);
     }
 
-    private void decreaseThreadReferenceCounter() throws Exception {
+    public void decreaseThreadReferenceCounter() throws Exception {
         synchronized (this) {
             threadReferenceCounter--;
             if (flushFlag == true) {
@@ -310,144 +314,101 @@
             }
         }
     }
+    
+    public void threadEnter() {
+        threadRefCount++;
+    }
+    
+    public void threadExit() throws Exception {
+        synchronized (this) {
+            threadRefCount--;
+            // Check if we've reached or exceeded the maximum number of pages.
+            if (!flushFlag && memFreePageManager.isFull()) {
+                flushFlag = true;
+            }
+            // Flush will only be handled by last exiting thread.
+            if (flushFlag && threadRefCount == 0) {
+                flush();
+                flushFlag = false;
+            }
+        }
+    }
 
     private void insert(ITupleReference tuple, LSMTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
-        try {
-            boolean continuePerformOp = false;
-            while (continuePerformOp == false) {
-            	synchronized (this) {
-            		if (!flushFlag) {
-            			threadReferenceCounter++;
-            			continuePerformOp = true;
-            		}
-            	}
-            }
-            ctx.LSMRTreeOpContext.memRtreeAccessor.insert(tuple);
-            decreaseThreadReferenceCounter();
-            // Check if we've reached or exceeded the maximum number of pages.
-            // Note: It doesn't matter if this inserter or another concurrent
-            // inserter caused the overflow.
-            // The first inserter reaching this code, should set the flush flag.
-            // Also, setting the flush flag multiple times doesn't hurt.
-            if (memFreePageManager.isFull()) {
-            	synchronized (this) {
-                    // If flushFlag is false it means we are the first inserter
-                    // to
-                    // trigger the flush. If flushFlag is already set to true,
-                    // there's no harm in setting it to true again.
-                    flushFlag = true;
-                    threadReferenceCounter--;
-                    if (threadReferenceCounter == 0) {
-                        flush();
-                        ctx.LSMRTreeOpContext.reset(IndexOp.INSERT);
-                        ctx.LSMRTreeOpContext.memRtreeAccessor.insert(tuple);
-                        flushFlag = false;
-                        return;
-                    } else if (threadReferenceCounter < 0) {
-                        throw new Error("Thread reference counter is below zero. This indicates a programming error!");
-                    }
+        boolean waitForFlush = false;
+        do {
+            synchronized (this) {
+                if (!flushFlag) {
+                    threadEnter();
+                    waitForFlush = false;
                 }
             }
+        } while (waitForFlush == true);
+        ctx.LSMRTreeOpContext.memRTreeAccessor.insert(tuple);
+        try {
+            threadExit();
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
 
     private void delete(ITupleReference tuple, LSMTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
-        try {
-        	boolean continuePerformOp = false;
-        	while (continuePerformOp == false) {
-        		synchronized (this) {
-        			if (!flushFlag) {
-        				threadReferenceCounter++;
-        				continuePerformOp = true;
-        			}
-        		}
-        	}
-        	ctx.LSMBTreeOpContext.memBtreeAccessor.insert(tuple);
-        	decreaseThreadReferenceCounter();
-            // Check if we've reached or exceeded the maximum number of pages.
-            // Note: It doesn't matter if this inserter or another concurrent
-            // inserter caused the overflow.
-            // The first inserter reaching this code, should set the flush flag.
-            // Also, setting the flush flag multiple times doesn't hurt.
-            if (memFreePageManager.isFull()) {
-            	synchronized (this) {
-                    // If flushFlag is false it means we are the first inserter
-                    // to
-                    // trigger the flush. If flushFlag is already set to true,
-                    // there's no harm in setting it to true again.
-                    flushFlag = true;
-                    threadReferenceCounter--;
-                    if (threadReferenceCounter == 0) {
-                        flush();
-                        ctx.LSMBTreeOpContext.reset(IndexOp.INSERT);
-                        ctx.LSMBTreeOpContext.memBtreeAccessor.insert(tuple);
-                        flushFlag = false;
-                        return;
-                    } else if (threadReferenceCounter < 0) {
-                        throw new Error("Thread reference counter is below zero. This indicates a programming error!");
-                    }
+        boolean waitForFlush = false;
+        do {
+            synchronized (this) {
+                if (!flushFlag) {
+                    threadEnter();
+                    waitForFlush = false;
                 }
             }
+        } while (waitForFlush == true);
+        ctx.LSMBTreeOpContext.memBTreeAccessor.insert(tuple);
+        try {
+            threadExit();
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
 
-    private void search(ITreeIndexCursor cursor, ISearchPredicate searchPred, LSMTreeOpContext ctx) throws Exception {
-        // int numberOfInDiskTrees;
-        // ListIterator<InDiskTreeInfo> inDiskTreeInfoListIterator;
-        // boolean continuePerformOp = false;
-        //
-        // ctx.reset(IndexOp.SEARCH);
-        //
-        // while (continuePerformOp == false) {
-        // synchronized (this) {
-        // if (!flushFlag) {
-        // threadReferenceCounter++;
-        // continuePerformOp = true;
-        // }
-        // }
-        // }
-        //
-        // // in-disk
-        // synchronized (inDiskTreeInfoList) {
-        // numberOfInDiskTrees = inDiskTreeInfoList.size();
-        // inDiskTreeInfoListIterator = inDiskTreeInfoList.listIterator();
-        // }
-        //
-        // LSMTreeCursorInitialState initialState = new
-        // LSMTreeCursorInitialState(numberOfInDiskTrees + 1,
-        // insertLeafFrameFactory, cmp, this);
-        // cursor.open(initialState, pred);
-        //
-        // BTree[] onDiskBtrees = new BTree[numberOfInDiskTrees];
-        // ITreeIndexAccessor[] onDiskBtreeAccessors = new
-        // ITreeIndexAccessor[numberOfInDiskTrees];
-        //
-        // for (int i = 0; i < numberOfInDiskTrees; i++) {
-        // // get btree instances for in-disk trees
-        // if (inDiskTreeInfoListIterator.hasNext()) {
-        // onDiskBtrees[i] = ((InDiskTreeInfo)
-        // inDiskTreeInfoListIterator.next()).getBTree();
-        // } else {
-        // throw new HyracksDataException("Cannot find in-disk tree instance");
-        // }
-        // onDiskBtreeAccessors[i] = onDiskBtrees[i].createAccessor();
-        // onDiskBtreeAccessors[i].search(((LSMTreeRangeSearchCursor)
-        // cursor).getCursor(i + 1), pred);
-        // }
-        //
-        // // in-memory
-        // ctx.memBtreeAccessor.search(((LSMTreeRangeSearchCursor)
-        // cursor).getCursor(0), pred);
-        //
-        // LSMPriorityQueueComparator LSMPriorityQueueCmp = new
-        // LSMPriorityQueueComparator(cmp);
-        // ((LSMTreeRangeSearchCursor)
-        // cursor).initPriorityQueue(numberOfInDiskTrees + 1,
-        // LSMPriorityQueueCmp);
+    private void search(ITreeIndexCursor cursor, ISearchPredicate rtreeSearchPred, LSMTreeOpContext ctx)
+            throws Exception {
+
+        boolean continuePerformOp = false;
+        ctx.reset(IndexOp.SEARCH);
+
+        while (continuePerformOp == false) {
+            synchronized (this) {
+                if (!flushFlag) {
+                    threadReferenceCounter++;
+                    continuePerformOp = true;
+                }
+            }
+        }
+
+        int numberOfInDiskTrees;
+        synchronized (inDiskRTreeList) {
+            numberOfInDiskTrees = inDiskRTreeList.size();
+        }
+
+        LSMRTreeCursorInitialState initialState = new LSMRTreeCursorInitialState(numberOfInDiskTrees + 1,
+                rtreeLeafFrameFactory, rtreeInteriorFrameFactory, btreeLeafFrameFactory, btreeCmp,
+                ctx.LSMBTreeOpContext.memBTreeAccessor, this);
+        cursor.open(initialState, rtreeSearchPred);
+
+        ITreeIndexAccessor[] onDiskRTreeAccessors = new ITreeIndexAccessor[numberOfInDiskTrees];
+
+        for (int i = 0; i < numberOfInDiskTrees; i++) {
+            onDiskRTreeAccessors[i] = inDiskRTreeList.get(i).createAccessor();
+            onDiskRTreeAccessors[i].search(((LSMRTreeSearchCursor) cursor).getRTreeCursor(i + 1), rtreeSearchPred);
+        }
+
+        // in-memory
+        ctx.LSMRTreeOpContext.memRTreeAccessor.search(((LSMRTreeSearchCursor) cursor).getRTreeCursor(0),
+                rtreeSearchPred);
+    }
+
+    public LinkedList<ITreeIndex> getInDiskBTreeList() {
+        return inDiskBTreeList;
     }
 
     private LSMTreeOpContext createOpContext() {
@@ -457,7 +418,7 @@
                 (IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(), memFreePageManager
                         .getMetaDataFrameFactory().createFrame(), 8), new LSMBTreeOpContext(
                 (BTree.BTreeAccessor) memBTree.createAccessor(), btreeLeafFrameFactory, btreeInteriorFrameFactory,
-                memFreePageManager.getMetaDataFrameFactory().createFrame(), cmp));
+                memFreePageManager.getMetaDataFrameFactory().createFrame(), btreeCmp));
     }
 
     private class LSMRTreeAccessor implements ITreeIndexAccessor {
@@ -506,3 +467,4 @@
     }
 
 }
+
diff --git a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeCursorInitialState.java b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeCursorInitialState.java
new file mode 100644
index 0000000..3838899
--- /dev/null
+++ b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeCursorInitialState.java
@@ -0,0 +1,69 @@
+package edu.uci.ics.hyracks.storage.am.lsmtree.rtree.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public class LSMRTreeCursorInitialState implements ICursorInitialState {
+
+    private int numberOfTrees;
+    private ITreeIndexFrameFactory rtreeInteriorFrameFactory;
+    private ITreeIndexFrameFactory rtreeLeafFrameFactory;
+    private ITreeIndexFrameFactory btreeLeafFrameFactory;
+    private MultiComparator btreeCmp;
+    private LSMRTree lsmRTree;
+    private BTree.BTreeAccessor memBtreeAccessor;
+
+    public LSMRTreeCursorInitialState(int numberOfTrees, ITreeIndexFrameFactory rtreeLeafFrameFactory,
+            ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
+            MultiComparator btreeCmp, BTree.BTreeAccessor memBtreeAccessor, LSMRTree lsmRTree) {
+        this.numberOfTrees = numberOfTrees;
+        this.rtreeLeafFrameFactory = rtreeLeafFrameFactory;
+        this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory;
+        this.btreeLeafFrameFactory = btreeLeafFrameFactory;
+        this.btreeCmp = btreeCmp;
+        this.lsmRTree = lsmRTree;
+        this.memBtreeAccessor = memBtreeAccessor;
+    }
+
+    public int getNumberOfTrees() {
+        return numberOfTrees;
+    }
+
+    public ITreeIndexFrameFactory getRTreeInteriorFrameFactory() {
+        return rtreeInteriorFrameFactory;
+    }
+
+    public ITreeIndexFrameFactory getRTreeLeafFrameFactory() {
+        return rtreeLeafFrameFactory;
+    }
+
+    public ITreeIndexFrameFactory getBTreeLeafFrameFactory() {
+        return btreeLeafFrameFactory;
+    }
+
+    public MultiComparator getBTreeCmp() {
+        return btreeCmp;
+    }
+
+    @Override
+    public ICachedPage getPage() {
+        return null;
+    }
+
+    @Override
+    public void setPage(ICachedPage page) {
+    }
+
+    public BTree.BTreeAccessor getMemBtreeAccessor() {
+        return memBtreeAccessor;
+    }
+
+    public LSMRTree getLsmRTree() {
+        return lsmRTree;
+    }
+
+}
diff --git a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeInMemoryBufferCache.java b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeInMemoryBufferCache.java
index 39a3665..5162c68 100644
--- a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeInMemoryBufferCache.java
+++ b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeInMemoryBufferCache.java
@@ -1,5 +1,7 @@
 package edu.uci.ics.hyracks.storage.am.lsmtree.rtree.impls;
 
+import java.nio.ByteBuffer;
+
 import edu.uci.ics.hyracks.storage.am.lsmtree.common.freepage.InMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
@@ -16,10 +18,28 @@
         int pageId = BufferedFileHandle.getPageId(dpid);
         int fileId = BufferedFileHandle.getFileId(dpid);
 
-        if (pageId == 0 || pageId == 1) {
-            return pages[pageId + 2 * fileId];
+        if (pageId < pages.length) {
+            // Common case: Return regular page.
+            
+            if (pageId == 0 || pageId == 1) {
+                return pages[pageId + 2 * fileId];
+            } else {
+                return pages[pageId];
+            }
         } else {
-            return pages[pageId];
+            // Rare case: Return overflow page, possibly expanding overflow
+            // array.
+            synchronized (overflowPages) {
+                int numNewPages = pageId - pages.length - overflowPages.size() + 1;
+                if (numNewPages > 0) {
+                    ByteBuffer[] buffers = allocator.allocate(pageSize, numNewPages);
+                    for (int i = 0; i < numNewPages; i++) {
+                        CachedPage overflowPage = new CachedPage(pages.length + overflowPages.size(), buffers[i]);
+                        overflowPages.add(overflowPage);
+                    }
+                }
+                return overflowPages.get(pageId - pages.length);
+            }
         }
     }
 
diff --git a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeInMemoryFreePageManager.java b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeInMemoryFreePageManager.java
index 6a08899..1511e47 100644
--- a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeInMemoryFreePageManager.java
+++ b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeInMemoryFreePageManager.java
@@ -7,8 +7,12 @@
 
 public class LSMRTreeInMemoryFreePageManager extends InMemoryFreePageManager {
 
-    public LSMRTreeInMemoryFreePageManager(int maxCapacity, ITreeIndexMetaDataFrameFactory metaDataFrameFactory) {
-        super(maxCapacity, metaDataFrameFactory);
+    public LSMRTreeInMemoryFreePageManager(int capacity, ITreeIndexMetaDataFrameFactory metaDataFrameFactory) {
+        super(capacity, metaDataFrameFactory);
+        // We start the currentPageId from 3, because the RTree uses
+        // the first page as metadata page, and the second page as root page.
+        // And the BTree uses the third page as metadata, and the third page as root page 
+        // (when returning free pages we first increment, then get)
         currentPageId.set(3);
     }
 
@@ -17,6 +21,10 @@
         currentPageId.set(3);
     }
 
+    public int getCapacity() {
+        return capacity - 4;
+    }
+    
     public void reset() {
         currentPageId.set(3);
     }
diff --git a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeOpContext.java b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeOpContext.java
index a3df90f..57a0413 100644
--- a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeOpContext.java
@@ -9,16 +9,16 @@
 
 public final class LSMRTreeOpContext extends RTreeOpContext {
     
-    public final RTree.RTreeAccessor memRtreeAccessor;
+    public final RTree.RTreeAccessor memRTreeAccessor;
 
     public LSMRTreeOpContext(RTree.RTreeAccessor memRtreeAccessor, IRTreeLeafFrame leafFrame,
             IRTreeInteriorFrame interiorFrame, ITreeIndexMetaDataFrame metaFrame, int treeHeightHint) {
 
         super(leafFrame, interiorFrame, metaFrame, treeHeightHint);
 
-        this.memRtreeAccessor = memRtreeAccessor;
+        this.memRTreeAccessor = memRtreeAccessor;
         // Overwrite the RTree accessor's op context with our LSMRTreeOpContext.
-        this.memRtreeAccessor.setOpContext(this);
+        this.memRTreeAccessor.setOpContext(this);
 
         reset(op);
     }
diff --git a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeSearchCursor.java
new file mode 100644
index 0000000..17f2d95
--- /dev/null
+++ b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeSearchCursor.java
@@ -0,0 +1,162 @@
+package edu.uci.ics.hyracks.storage.am.lsmtree.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public class LSMRTreeSearchCursor implements ITreeIndexCursor {
+
+    private BTreeRangeSearchCursor[] btreeCursors;
+    private RTreeSearchCursor[] rtreeCursors;
+    private BTree.BTreeAccessor memBtreeAccessor;
+    private ITreeIndexAccessor[] onDiskBTreeAccessors;
+    private int currentCursror;
+    private MultiComparator btreeCmp;
+    private int numberOfTrees;
+    private LSMRTree lsmRTree;
+    private RangePredicate btreeRangePredicate;
+    private ITupleReference frameTuple;
+
+    public LSMRTreeSearchCursor() {
+        currentCursror = 0;
+    }
+
+    public RTreeSearchCursor getRTreeCursor(int cursorIndex) {
+        return rtreeCursors[cursorIndex];
+    }
+
+    @Override
+    public void reset() {
+        // do nothing
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        for (int i = currentCursror; i < numberOfTrees; i++) {
+            while (rtreeCursors[i].hasNext()) {
+                rtreeCursors[i].next();
+                ITupleReference currentTuple = rtreeCursors[i].getTuple();
+                boolean killerTupleFound = false;
+                for (int j = 0; j <= i; j++) {
+                    btreeRangePredicate.setHighKey(currentTuple, true);
+                    btreeRangePredicate.setLowKey(currentTuple, true);
+
+                    if (j == 0) {
+                        memBtreeAccessor.search(btreeCursors[j], btreeRangePredicate);
+                    } else {
+                        onDiskBTreeAccessors[j - 1].search(btreeCursors[j], btreeRangePredicate);
+                    }
+
+                    if (btreeCursors[j].hasNext()) {
+                        killerTupleFound = true;
+                        break;
+                    }
+                }
+                if (!killerTupleFound) {
+                    frameTuple = currentTuple;
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void next() throws Exception {
+        while (true) {
+            if (currentCursror < numberOfTrees || rtreeCursors[currentCursror].hasNext()) {
+                break;
+            } else {
+                currentCursror++;
+            }
+        }
+    }
+
+    @Override
+    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+
+        lsmRTree = ((LSMRTreeCursorInitialState) initialState).getLsmRTree();
+        btreeCmp = ((LSMRTreeCursorInitialState) initialState).getBTreeCmp();
+        memBtreeAccessor = ((LSMRTreeCursorInitialState) initialState).getMemBtreeAccessor();
+
+        numberOfTrees = ((LSMRTreeCursorInitialState) initialState).getNumberOfTrees();
+
+        rtreeCursors = new RTreeSearchCursor[numberOfTrees];
+        btreeCursors = new BTreeRangeSearchCursor[numberOfTrees];
+
+        onDiskBTreeAccessors = new ITreeIndexAccessor[numberOfTrees - 1];
+
+        for (int i = 0; i < numberOfTrees; i++) {
+            if (i < ((LSMRTreeCursorInitialState) initialState).getNumberOfTrees() - 1) { // we
+                                                                                          // already
+                                                                                          // have
+                                                                                          // an
+                                                                                          // accessor
+                                                                                          // for
+                                                                                          // the
+                                                                                          // in-memory
+                                                                                          // b-tree
+                onDiskBTreeAccessors[i] = lsmRTree.getInDiskBTreeList().get(i).createAccessor();
+            }
+
+            rtreeCursors[i] = new RTreeSearchCursor((IRTreeInteriorFrame) ((LSMRTreeCursorInitialState) initialState)
+                    .getRTreeInteriorFrameFactory().createFrame(),
+                    (IRTreeLeafFrame) ((LSMRTreeCursorInitialState) initialState).getRTreeLeafFrameFactory()
+                            .createFrame());
+
+            btreeCursors[i] = new BTreeRangeSearchCursor((IBTreeLeafFrame) ((LSMRTreeCursorInitialState) initialState)
+                    .getBTreeLeafFrameFactory().createFrame(), false);
+        }
+        btreeRangePredicate = new RangePredicate(true, null, null, true, true, btreeCmp, btreeCmp);
+    }
+
+    @Override
+    public ICachedPage getPage() {
+        // do nothing
+        return null;
+    }
+
+    @Override
+    public void close() throws Exception {
+        lsmRTree.decreaseThreadReferenceCounter();
+        for (int i = 0; i < numberOfTrees; i++) {
+            rtreeCursors[i].close();
+            btreeCursors[i].close();
+        }
+        rtreeCursors = null;
+        btreeCursors = null;
+    }
+
+    @Override
+    public void setBufferCache(IBufferCache bufferCache) {
+        // do nothing
+    }
+
+    @Override
+    public void setFileId(int fileId) {
+        // do nothing
+    }
+
+    @Override
+    public ITupleReference getTuple() {
+        return frameTuple;
+    }
+
+    @Override
+    public boolean exclusiveLatchNodes() {
+        return false;
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsmtree-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/LSMRTreeTest.java b/hyracks-tests/hyracks-storage-am-lsmtree-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/LSMRTreeTest.java
index 0a412b3..cb21305 100644
--- a/hyracks-tests/hyracks-storage-am-lsmtree-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/LSMRTreeTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsmtree-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/LSMRTreeTest.java
@@ -1,8 +1,12 @@
 package edu.uci.ics.hyracks.storage.am.lsmtree.rtree;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Random;
 import java.util.logging.Level;
 
@@ -22,12 +26,14 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
@@ -39,10 +45,12 @@
 import edu.uci.ics.hyracks.storage.am.lsmtree.rtree.impls.LSMRTree;
 import edu.uci.ics.hyracks.storage.am.lsmtree.rtree.impls.LSMRTreeInMemoryBufferCacheFactory;
 import edu.uci.ics.hyracks.storage.am.lsmtree.rtree.impls.LSMRTreeInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsmtree.rtree.impls.LSMRTreeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.lsmtree.rtree.impls.LSMTypeAwareTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.am.lsmtree.rtree.impls.RTreeFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
 import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
@@ -53,7 +61,7 @@
 public class LSMRTreeTest extends AbstractLSMTreeTest {
 
     private static final int PAGE_SIZE = 256;
-    private static final int NUM_PAGES = 100;
+    private static final int NUM_PAGES = 1000;
     private static final int MAX_OPEN_FILES = 100;
     private static final int HYRACKS_FRAME_SIZE = 128;
     private IHyracksTaskContext ctx = TestUtils.create(HYRACKS_FRAME_SIZE);
@@ -75,13 +83,24 @@
                 NUM_PAGES);
         IBufferCache memBufferCache = inMemBufferCacheFactory.createInMemoryBufferCache();
 
-        // declare keys
-        int keyFieldCount = 4;
-        IBinaryComparator[] cmps = new IBinaryComparator[keyFieldCount];
-        cmps[0] = PointableBinaryComparatorFactory.of(DoublePointable.FACTORY).createBinaryComparator();
-        cmps[1] = cmps[0];
-        cmps[2] = cmps[0];
-        cmps[3] = cmps[0];
+        // declare r-tree keys
+        int rtreeKeyFieldCount = 4;
+        IBinaryComparator[] rtreeCmps = new IBinaryComparator[rtreeKeyFieldCount];
+        rtreeCmps[0] = PointableBinaryComparatorFactory.of(DoublePointable.FACTORY).createBinaryComparator();
+        rtreeCmps[1] = rtreeCmps[0];
+        rtreeCmps[2] = rtreeCmps[0];
+        rtreeCmps[3] = rtreeCmps[0];
+
+        // declare b-tree keys
+        int btreeKeyFieldCount = 7;
+        IBinaryComparator[] btreeCmps = new IBinaryComparator[btreeKeyFieldCount];
+        btreeCmps[0] = PointableBinaryComparatorFactory.of(DoublePointable.FACTORY).createBinaryComparator();
+        btreeCmps[1] = btreeCmps[0];
+        btreeCmps[2] = btreeCmps[0];
+        btreeCmps[3] = btreeCmps[0];
+        btreeCmps[4] = btreeCmps[0];
+        btreeCmps[5] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY).createBinaryComparator();
+        btreeCmps[6] = btreeCmps[0];
 
         // declare tuple fields
         int fieldCount = 7;
@@ -94,11 +113,12 @@
         typeTraits[5] = IntegerPointable.TYPE_TRAITS;
         typeTraits[6] = DoublePointable.TYPE_TRAITS;
 
-        MultiComparator cmp = new MultiComparator(cmps);
+        MultiComparator rtreeCmp = new MultiComparator(rtreeCmps);
+        MultiComparator btreeCmp = new MultiComparator(btreeCmps);
 
         // create value providers
         IPrimitiveValueProviderFactory[] valueProviderFactories = RTreeUtils.createPrimitiveValueProviderFactories(
-                cmps.length, DoublePointable.FACTORY);
+                rtreeCmps.length, DoublePointable.FACTORY);
 
         LSMTypeAwareTupleWriterFactory rtreeTupleWriterFactory = new LSMTypeAwareTupleWriterFactory(typeTraits, false);
         LSMTypeAwareTupleWriterFactory btreeTupleWriterFactory = new LSMTypeAwareTupleWriterFactory(typeTraits, true);
@@ -113,21 +133,22 @@
 
         ITreeIndexMetaDataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
 
-        InMemoryFreePageManager memFreePageManager = new LSMRTreeInMemoryFreePageManager(100, metaFrameFactory);
+        InMemoryFreePageManager memFreePageManager = new LSMRTreeInMemoryFreePageManager(1000, metaFrameFactory);
 
-        LinkedListFreePageManagerFactory freePageManagerFactory = new LinkedListFreePageManagerFactory(bufferCache, metaFrameFactory);
+        LinkedListFreePageManagerFactory freePageManagerFactory = new LinkedListFreePageManagerFactory(bufferCache,
+                metaFrameFactory);
 
-        RTreeFactory rTreeFactory = new RTreeFactory(bufferCache, freePageManagerFactory, cmp, fieldCount,
+        RTreeFactory rTreeFactory = new RTreeFactory(bufferCache, freePageManagerFactory, rtreeCmp, fieldCount,
                 rtreeInteriorFrameFactory, rtreeLeafFrameFactory);
-        BTreeFactory bTreeFactory = new BTreeFactory(bufferCache, freePageManagerFactory, cmp, fieldCount,
+        BTreeFactory bTreeFactory = new BTreeFactory(bufferCache, freePageManagerFactory, btreeCmp, fieldCount,
                 btreeInteriorFrameFactory, btreeLeafFrameFactory);
 
-        LSMRTree lsmrtree = new LSMRTree(memBufferCache, bufferCache, fieldCount, cmp, memFreePageManager,
-                rtreeInteriorFrameFactory, btreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeLeafFrameFactory,
-                rTreeFactory, bTreeFactory, (IFileMapManager) fmp);
+        LSMRTree lsmRTree = new LSMRTree(memBufferCache, bufferCache, fieldCount, rtreeCmp, btreeCmp,
+                memFreePageManager, rtreeInteriorFrameFactory, btreeInteriorFrameFactory, rtreeLeafFrameFactory,
+                btreeLeafFrameFactory, rTreeFactory, bTreeFactory, (IFileMapManager) fmp);
 
-        lsmrtree.create(fileId);
-        lsmrtree.open(fileId);
+        lsmRTree.create(fileId);
+        lsmRTree.open(fileId);
 
         ByteBuffer frame = ctx.allocateFrame();
         FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -147,7 +168,7 @@
 
         FrameTupleReference tuple = new FrameTupleReference();
 
-        ITreeIndexAccessor lsmTreeAccessor = lsmrtree.createAccessor();
+        ITreeIndexAccessor lsmTreeAccessor = lsmRTree.createAccessor();
 
         Random rnd = new Random();
         rnd.setSeed(50);
@@ -193,6 +214,9 @@
                 // }
             }
 
+            if (i == 1691) {
+                System.out.println();
+            }
             try {
                 lsmTreeAccessor.insert(tuple);
             } catch (TreeIndexException e) {
@@ -201,6 +225,65 @@
             }
         }
 
+        for (int i = 0; i < 50; i++) {
+            double p1x = rnd.nextDouble();
+            double p1y = rnd.nextDouble();
+            double p2x = rnd.nextDouble();
+            double p2y = rnd.nextDouble();
+
+            int pk = rnd.nextInt();
+
+            tb.reset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(Math.min(p1x, p2x), dos);
+            tb.addFieldEndOffset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(Math.min(p1y, p2y), dos);
+            tb.addFieldEndOffset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(Math.max(p1x, p2x), dos);
+            tb.addFieldEndOffset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(Math.max(p1y, p2y), dos);
+            tb.addFieldEndOffset();
+            IntegerSerializerDeserializer.INSTANCE.serialize(pk, dos);
+            tb.addFieldEndOffset();
+
+            appender.reset(frame, true);
+            appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+
+            tuple.reset(accessor, 0);
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(i + " Searching for: " + Math.min(p1x, p2x) + " " + Math.min(p1y, p2y) + " "
+                        + Math.max(p1x, p2x) + " " + Math.max(p1y, p2y));
+            }
+
+            ITreeIndexCursor searchCursor = new LSMRTreeSearchCursor();
+            SearchPredicate searchPredicate = new SearchPredicate(tuple, rtreeCmp);
+
+            lsmTreeAccessor.search(searchCursor, searchPredicate);
+
+            ArrayList<Integer> results = new ArrayList<Integer>();
+            try {
+                while (searchCursor.hasNext()) {
+                    searchCursor.next();
+                    ITupleReference frameTuple = searchCursor.getTuple();
+                    ByteArrayInputStream inStream = new ByteArrayInputStream(frameTuple.getFieldData(4),
+                            frameTuple.getFieldStart(4), frameTuple.getFieldLength(4));
+                    DataInput dataIn = new DataInputStream(inStream);
+                    Integer res = IntegerSerializerDeserializer.INSTANCE.deserialize(dataIn);
+                    results.add(res);
+                    if (results.size() == 1632) {
+                        System.out.println();
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                searchCursor.close();
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("There are " + results.size() + " objects that satisfy the query");
+            }
+        }
+
         rnd.setSeed(50);
         for (int i = 0; i < 5000; i++) {
 
@@ -249,7 +332,7 @@
             }
         }
 
-        lsmrtree.close();
+        lsmRTree.close();
         bufferCache.closeFile(fileId);
         memBufferCache.close();
     }