- Added LSM-RTree merge operation with correct concurrency managment.
- Bug Fixes and code cleaning.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1083 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
index 4bccba4..b06b7e7 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
@@ -6,6 +6,8 @@
 import java.util.concurrent.LinkedBlockingQueue;
 
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 /**
@@ -70,6 +72,18 @@
             } else {
                 return new IntegerFieldValueGenerator(rnd);
             }
+        } else if (serde instanceof FloatSerializerDeserializer) {
+            if (sorted) {
+                return new SortedFloatFieldValueGenerator();
+            } else {
+                return new FloatFieldValueGenerator(rnd);
+            }
+        } else if (serde instanceof DoubleSerializerDeserializer) {
+            if (sorted) {
+                return new SortedDoubleFieldValueGenerator();
+            } else {
+                return new DoubleFieldValueGenerator(rnd);
+            }
         }
         System.out.println("NULL");
         //if (serde instanceof Integer64SerializerDeserializer) {
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DoubleFieldValueGenerator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DoubleFieldValueGenerator.java
new file mode 100644
index 0000000..fcac93a
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DoubleFieldValueGenerator.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hyracks.storage.am.common.datagen;
+
+import java.util.Random;
+
+public class DoubleFieldValueGenerator implements IFieldValueGenerator<Double> {
+    protected final Random rnd;
+
+    public DoubleFieldValueGenerator(Random rnd) {
+        this.rnd = rnd;
+    }
+
+    @Override
+    public Double next() {
+        return rnd.nextDouble();
+    }
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/FloatFieldValueGenerator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/FloatFieldValueGenerator.java
new file mode 100644
index 0000000..6f21c77
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/FloatFieldValueGenerator.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hyracks.storage.am.common.datagen;
+
+import java.util.Random;
+
+public class FloatFieldValueGenerator implements IFieldValueGenerator<Float> {
+    protected final Random rnd;
+
+    public FloatFieldValueGenerator(Random rnd) {
+        this.rnd = rnd;
+    }
+
+    @Override
+    public Float next() {
+        return rnd.nextFloat();
+    }
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedDoubleFieldValueGenerator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedDoubleFieldValueGenerator.java
new file mode 100644
index 0000000..4193811
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedDoubleFieldValueGenerator.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hyracks.storage.am.common.datagen;
+
+public class SortedDoubleFieldValueGenerator implements IFieldValueGenerator<Double> {
+    private double val = 0.0d;
+
+    public SortedDoubleFieldValueGenerator() {
+    }
+    
+    public SortedDoubleFieldValueGenerator(double startVal) {
+        val = startVal;
+    }
+    
+    @Override
+    public Double next() {
+        return val++;
+    }
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedFloatFieldValueGenerator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedFloatFieldValueGenerator.java
new file mode 100644
index 0000000..1f6b315
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedFloatFieldValueGenerator.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hyracks.storage.am.common.datagen;
+
+public class SortedFloatFieldValueGenerator implements IFieldValueGenerator<Float> {
+    private float val = 0.0f;
+
+    public SortedFloatFieldValueGenerator() {
+    }
+    
+    public SortedFloatFieldValueGenerator(float startVal) {
+        val = startVal;
+    }
+    
+    @Override
+    public Float next() {
+        return val++;
+    }
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IExperimentRunner.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IExperimentRunner.java
new file mode 100644
index 0000000..8b9d6f3
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IExperimentRunner.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
+
+public interface IExperimentRunner {
+    public static int DEFAULT_MAX_OUTSTANDING = 100000;
+    
+    public void init() throws Exception;
+    
+    public long runExperiment(DataGenThread dataGen, int numThreads) throws Exception;
+    
+    public void reset() throws Exception;
+    
+    public void deinit() throws Exception;
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeBulkLoadContext.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeBulkLoadContext.java
deleted file mode 100644
index 727c211..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeBulkLoadContext.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
-
-public class LSMTreeBulkLoadContext implements IIndexBulkLoadContext {
-    private final ITreeIndex tree;
-    private IIndexBulkLoadContext bulkLoadCtx;
-
-    public LSMTreeBulkLoadContext(ITreeIndex tree) {
-        this.tree = tree;
-    }
-
-    public void beginBulkLoad(float fillFactor) throws HyracksDataException, TreeIndexException {
-        bulkLoadCtx = tree.beginBulkLoad(fillFactor);
-    }
-
-    public ITreeIndex getTree() {
-        return tree;
-    }
-
-    public IIndexBulkLoadContext getBulkLoadCtx() {
-        return bulkLoadCtx;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 7b59f1b..5a9c454 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -2,17 +2,19 @@
 
 import java.io.File;
 import java.io.FilenameFilter;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
@@ -28,17 +30,16 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileNameManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeBulkLoadContext;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
-import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 public class LSMRTree implements ILSMTree {
+    private static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
 
     // In-memory components.
     private RTree memRTree;
@@ -53,12 +54,8 @@
     private final BTreeFactory diskBTreeFactory;
     private final IBufferCache diskBufferCache;
     private final IFileMapProvider diskFileMapProvider;
-    private LinkedList<RTree> onDiskRTrees = new LinkedList<RTree>();
-    private LinkedList<RTree> mergedRTrees = new LinkedList<RTree>();
-    private LinkedList<BTree> onDiskBTrees = new LinkedList<BTree>();
-    private LinkedList<BTree> mergedBTrees = new LinkedList<BTree>();
-    private int onDiskRTreeCount;
-    private int onDiskBTreeCount;
+    private LinkedList<RTree> diskRTrees = new LinkedList<RTree>();
+    private LinkedList<BTree> diskBTrees = new LinkedList<BTree>();
 
     // Common for in-memory and on-disk components.
     private final ITreeIndexFrameFactory rtreeInteriorFrameFactory;
@@ -71,6 +68,15 @@
     private int threadRefCount;
     private boolean flushFlag;
 
+    // For synchronizing searchers with a concurrent merge.
+    private AtomicBoolean isMerging = new AtomicBoolean(false);
+    private AtomicInteger searcherRefCountA = new AtomicInteger(0);
+    private AtomicInteger searcherRefCountB = new AtomicInteger(0);
+    // Represents the current number of searcher threads that are operating on
+    // the unmerged on-disk RTrees and BTrees.
+    // We alternate between searcherRefCountA and searcherRefCountB.
+    private AtomicInteger searcherRefCount = searcherRefCountA;
+
     public LSMRTree(IBufferCache memBufferCache, InMemoryFreePageManager memFreePageManager,
             ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory rtreeLeafFrameFactory,
             ITreeIndexFrameFactory btreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
@@ -93,8 +99,6 @@
         this.diskRTreeFactory = diskRTreeFactory;
         this.diskBTreeFactory = diskBTreeFactory;
         this.btreeCmp = btreeCmp;
-        this.onDiskRTreeCount = 0;
-        this.onDiskBTreeCount = 0;
         this.threadRefCount = 0;
         this.flushFlag = false;
         this.fileNameManager = fileNameManager;
@@ -103,77 +107,6 @@
     }
 
     @Override
-    public ITreeIndexAccessor createAccessor() {
-        return new LSMRTreeAccessor(this);
-    }
-
-    @Override
-    public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws TreeIndexException, HyracksDataException {
-        // Note that by using a flush target file name, we state that the new
-        // bulk loaded tree is "newer" than any other merged tree.
-
-        String fileName = fileNameManager.getFlushFileName();
-        RTree diskRTree = (RTree) createDiskTree(fileName + "-rtree", diskRTreeFactory, true);
-        // For each RTree, we require to have a buddy BTree. thus, we create an
-        // empty BTree. This can be optimized later.
-        createDiskTree(fileName + "-btree", diskBTreeFactory, true);
-        LSMTreeBulkLoadContext bulkLoadCtx = new LSMTreeBulkLoadContext(diskRTree);
-        bulkLoadCtx.beginBulkLoad(fillFactor);
-        return bulkLoadCtx;
-    }
-
-    @Override
-    public void bulkLoadAddTuple(ITupleReference tuple, IIndexBulkLoadContext ictx) throws HyracksDataException {
-        LSMTreeBulkLoadContext bulkLoadCtx = (LSMTreeBulkLoadContext) ictx;
-        bulkLoadCtx.getTree().bulkLoadAddTuple(tuple, bulkLoadCtx.getBulkLoadCtx());
-
-    }
-
-    @Override
-    public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
-        LSMTreeBulkLoadContext bulkLoadCtx = (LSMTreeBulkLoadContext) ictx;
-        bulkLoadCtx.getTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());
-        onDiskRTrees.addFirst((RTree) bulkLoadCtx.getTree());
-
-    }
-
-    @Override
-    public ITreeIndexFrameFactory getLeafFrameFactory() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public ITreeIndexFrameFactory getInteriorFrameFactory() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public IFreePageManager getFreePageManager() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public int getFieldCount() {
-        // TODO Auto-generated method stub
-        return 0;
-    }
-
-    @Override
-    public int getRootPageId() {
-        // TODO Auto-generated method stub
-        return 0;
-    }
-
-    @Override
-    public IndexType getIndexType() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
     public void create(int indexFileId) throws HyracksDataException {
         memRTree.create(rtreeFileId);
         memBTree.create(btreeFileId);
@@ -217,210 +150,108 @@
         if (rtreeFiles == null || btreeFiles == null) {
             return;
         }
-        
+
         Comparator<String> fileNameCmp = fileNameManager.getFileNameComparator();
         Arrays.sort(rtreeFiles, fileNameCmp);
         for (String fileName : rtreeFiles) {
             RTree rtree = (RTree) createDiskTree(fileName, diskRTreeFactory, false);
-            onDiskRTrees.add(rtree);
+            diskRTrees.add(rtree);
         }
 
         Arrays.sort(btreeFiles, fileNameCmp);
         for (String fileName : btreeFiles) {
             BTree btree = (BTree) createDiskTree(fileName, diskBTreeFactory, false);
-            onDiskBTrees.add(btree);
+            diskBTrees.add(btree);
         }
     }
 
     @Override
     public void close() throws HyracksDataException {
-        for (RTree rtree : onDiskRTrees) {
+        for (RTree rtree : diskRTrees) {
             diskBufferCache.closeFile(rtree.getFileId());
             rtree.close();
         }
-        for (BTree btree : onDiskBTrees) {
+        for (BTree btree : diskBTrees) {
             diskBufferCache.closeFile(btree.getFileId());
             btree.close();
         }
-        onDiskRTrees.clear();
-        onDiskBTrees.clear();
-        onDiskRTreeCount = 0;
-        onDiskBTreeCount = 0;
+        diskRTrees.clear();
+        diskBTrees.clear();
         memRTree.close();
         memBTree.close();
     }
 
-    private ITreeIndex createDiskTree(String fileName, TreeFactory diskTreeFactory, boolean createTree)
-            throws HyracksDataException {
-        // Register the new tree file.
-        FileReference file = new FileReference(new File(fileName));
-        // TODO: Delete the file during cleanup.
-        diskBufferCache.createFile(file);
-        int diskTreeFileId = diskFileMapProvider.lookupFileId(file);
-        // TODO: Close the file during cleanup.
-        diskBufferCache.openFile(diskTreeFileId);
-        // Create new tree instance.
-        ITreeIndex diskTree = diskTreeFactory.createIndexInstance(diskTreeFileId);
-        if (createTree) {
-            diskTree.create(diskTreeFileId);
-        }
-        // TODO: Close the tree during cleanup.
-        diskTree.open(diskTreeFileId);
-        return diskTree;
+    @Override
+    public ITreeIndexAccessor createAccessor() {
+        return new LSMRTreeAccessor(this);
     }
 
     @Override
-    public void merge() throws Exception {
-
-        // Cursor setting -- almost the same as search, only difference is
-        // "no cursor for in-memory tree"
-        int numberOfInDiskTrees = onDiskRTrees.size();
-
-        RTreeSearchCursor[] rtreeCursors = new RTreeSearchCursor[numberOfInDiskTrees];
-        BTreeRangeSearchCursor[] btreeCursors = new BTreeRangeSearchCursor[numberOfInDiskTrees];
-
-        for (int i = 0; i < numberOfInDiskTrees; i++) {
-            rtreeCursors[i] = new RTreeSearchCursor((IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
-                    (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
-
-            btreeCursors[i] = new BTreeRangeSearchCursor((IBTreeLeafFrame) btreeLeafFrameFactory.createFrame(), false);
-        }
-
-        String fileName = fileNameManager.getMergeFileName();
-        RTree mergedRTree = (RTree) createDiskTree(fileName + "-rtree", diskRTreeFactory, true);
-        BTree mergedBTree = (BTree) createDiskTree(fileName + "-btree", diskBTreeFactory, true);
-
-        // BulkLoad the tuples from the trees into the new merged trees.
-        IIndexBulkLoadContext rtreeBulkLoadCtx = mergedRTree.beginBulkLoad(1.0f);
-        IIndexBulkLoadContext btreeBulkLoadCtx = mergedBTree.beginBulkLoad(1.0f);
-
-        for (int i = 0; i < numberOfInDiskTrees; i++) {
-
-            // scan the RTrees
-            ITreeIndexCursor rtreeScanCursor = new RTreeSearchCursor(
-                    (IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
-                    (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
-            SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
-
-            ITreeIndexAccessor onDiskRTreeAccessor = onDiskRTrees.get(i).createAccessor();
-            onDiskRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
-
-            try {
-                while (rtreeScanCursor.hasNext()) {
-                    rtreeScanCursor.next();
-                    ITupleReference frameTuple = rtreeScanCursor.getTuple();
-                    mergedRTree.bulkLoadAddTuple(frameTuple, rtreeBulkLoadCtx);
-                }
-            } finally {
-                rtreeScanCursor.close();
-            }
-
-            // scan the BTrees
-            ITreeIndexCursor btreeScanCursor = new BTreeRangeSearchCursor(
-                    (IBTreeLeafFrame) btreeLeafFrameFactory.createFrame(), false);
-            RangePredicate btreeNullPredicate = new RangePredicate(true, null, null, true, true, null, null);
-            ITreeIndexAccessor onDiskBTreeAccessor = onDiskBTrees.get(i).createAccessor();
-            onDiskBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
-
-            try {
-                while (btreeScanCursor.hasNext()) {
-                    btreeScanCursor.next();
-                    ITupleReference frameTuple = btreeScanCursor.getTuple();
-                    mergedBTree.bulkLoadAddTuple(frameTuple, btreeBulkLoadCtx);
-                }
-            } finally {
-                btreeScanCursor.close();
-            }
-
-        }
-        mergedRTree.endBulkLoad(rtreeBulkLoadCtx);
-        mergedBTree.endBulkLoad(btreeBulkLoadCtx);
-
-        // TODO: complete the merge code
-
-    }
-
-    @Override
-    public void flush() throws HyracksDataException, TreeIndexException {
-
-        // scan the RTree
-        ITreeIndexCursor rtreeScanCursor = new RTreeSearchCursor(
-                (IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
-                (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
-        SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
-
-        ITreeIndexAccessor memRTreeAccessor = memRTree.createAccessor();
-        memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
+    public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws TreeIndexException, HyracksDataException {
+        // Note that by using a flush target file name, we state that the new
+        // bulk loaded tree is "newer" than any other merged tree.
 
         String fileName = fileNameManager.getFlushFileName();
         RTree diskRTree = (RTree) createDiskTree(fileName + "-rtree", diskRTreeFactory, true);
-
-        // BulkLoad the tuples from the in-memory tree into the new disk RTree.
-        IIndexBulkLoadContext rtreeBulkLoadCtx = diskRTree.beginBulkLoad(1.0f);
-
-        try {
-            while (rtreeScanCursor.hasNext()) {
-                rtreeScanCursor.next();
-                ITupleReference frameTuple = rtreeScanCursor.getTuple();
-                diskRTree.bulkLoadAddTuple(frameTuple, rtreeBulkLoadCtx);
-            }
-        } finally {
-            rtreeScanCursor.close();
-        }
-        diskRTree.endBulkLoad(rtreeBulkLoadCtx);
-
-        // scan the BTree
-        ITreeIndexCursor btreeScanCursor = new BTreeRangeSearchCursor(
-                (IBTreeLeafFrame) btreeLeafFrameFactory.createFrame(), false);
-        RangePredicate btreeNullPredicate = new RangePredicate(true, null, null, true, true, null, null);
-        ITreeIndexAccessor memBTreeAccessor = memBTree.createAccessor();
-        memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
-
+        // For each RTree, we require to have a buddy BTree. thus, we create an
+        // empty BTree. This can be optimized later.
         BTree diskBTree = (BTree) createDiskTree(fileName + "-btree", diskBTreeFactory, true);
+        LSMRTreeBulkLoadContext bulkLoadCtx = new LSMRTreeBulkLoadContext(diskRTree, diskBTree);
+        bulkLoadCtx.beginBulkLoad(fillFactor);
+        return bulkLoadCtx;
+    }
 
-        // BulkLoad the tuples from the in-memory tree into the new disk BTree.
-        IIndexBulkLoadContext btreeBulkLoadCtx = diskBTree.beginBulkLoad(1.0f);
-        try {
-            while (btreeScanCursor.hasNext()) {
-                btreeScanCursor.next();
-                ITupleReference frameTuple = btreeScanCursor.getTuple();
-                diskBTree.bulkLoadAddTuple(frameTuple, btreeBulkLoadCtx);
-            }
-        } finally {
-            btreeScanCursor.close();
+    @Override
+    public void bulkLoadAddTuple(ITupleReference tuple, IIndexBulkLoadContext ictx) throws HyracksDataException {
+        LSMRTreeBulkLoadContext bulkLoadCtx = (LSMRTreeBulkLoadContext) ictx;
+        bulkLoadCtx.getRTree().bulkLoadAddTuple(tuple, bulkLoadCtx.getBulkLoadCtx());
+
+    }
+
+    @Override
+    public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
+        LSMRTreeBulkLoadContext bulkLoadCtx = (LSMRTreeBulkLoadContext) ictx;
+        bulkLoadCtx.getRTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());
+        synchronized (diskRTrees) {
+            diskRTrees.addFirst(bulkLoadCtx.getRTree());
+            diskBTrees.addFirst(bulkLoadCtx.getBTree());
         }
-        diskBTree.endBulkLoad(btreeBulkLoadCtx);
-
-        resetInMemoryTrees();
-
-        onDiskRTrees.addFirst(diskRTree);
-        onDiskBTrees.addFirst(diskBTree);
-
     }
 
-    public void resetInMemoryTrees() throws HyracksDataException {
-        memFreePageManager.reset();
-        memRTree.create(rtreeFileId);
-        memBTree.create(btreeFileId);
+    @Override
+    public ITreeIndexFrameFactory getLeafFrameFactory() {
+        // TODO Auto-generated method stub
+        return null;
     }
 
-    public void threadEnter() {
-        threadRefCount++;
+    @Override
+    public ITreeIndexFrameFactory getInteriorFrameFactory() {
+        // TODO Auto-generated method stub
+        return null;
     }
 
-    public void threadExit() throws HyracksDataException, TreeIndexException {
-        synchronized (this) {
-            threadRefCount--;
-            // Check if we've reached or exceeded the maximum number of pages.
-            if (!flushFlag && memFreePageManager.isFull()) {
-                flushFlag = true;
-            }
-            // Flush will only be handled by last exiting thread.
-            if (flushFlag && threadRefCount == 0) {
-                flush();
-                flushFlag = false;
-            }
-        }
+    @Override
+    public IFreePageManager getFreePageManager() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public int getFieldCount() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public int getRootPageId() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public IndexType getIndexType() {
+        // TODO Auto-generated method stub
+        return null;
     }
 
     private void insert(ITupleReference tuple, LSMRTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
@@ -459,25 +290,50 @@
         }
     }
 
-    private void search(ITreeIndexCursor cursor, ISearchPredicate rtreeSearchPred, LSMRTreeOpContext ctx,
-            boolean includeMemTree) throws Exception {
-
-        boolean continuePerformOp = false;
-        ctx.reset(IndexOp.SEARCH);
-        while (continuePerformOp == false) {
-            synchronized (this) {
-                if (!flushFlag) {
-                    threadRefCount++;
-                    continuePerformOp = true;
+    private Pair<List<RTree>, List<BTree>> search(ITreeIndexCursor cursor, ISearchPredicate rtreeSearchPred,
+            LSMRTreeOpContext ctx, boolean includeMemRTree) throws HyracksDataException, TreeIndexException {
+        // If the search doesn't include the in-memory RTree, then we don't have
+        // to synchronize with a flush.
+        if (includeMemRTree) {
+            boolean waitForFlush = true;
+            do {
+                synchronized (this) {
+                    if (!flushFlag) {
+                        // The corresponding threadExit() is in
+                        // LSMTreeRangeSearchCursor.close().
+                        threadEnter();
+                        waitForFlush = false;
+                    }
                 }
+            } while (waitForFlush);
+        }
+
+        // Get a snapshot of the current on-disk RTrees and BTrees.
+        // If includeMemRTree is true, then no concurrent
+        // flush can add another on-disk RTree (due to threadEnter());
+        // If includeMemRTree is false, then it is possible that a concurrent
+        // flush adds another on-disk RTree.
+        // Since this mode is only used for merging trees, it doesn't really
+        // matter if the merge excludes the new on-disk RTree.
+        List<RTree> diskRTreesSnapshot = new ArrayList<RTree>();
+        List<BTree> diskBTreesSnapshot = new ArrayList<BTree>();
+        AtomicInteger localSearcherRefCount = null;
+        synchronized (diskRTrees) {
+            diskRTreesSnapshot.addAll(diskRTrees);
+            diskBTreesSnapshot.addAll(diskBTrees);
+            // Only remember the search ref count when performing a merge (i.e.,
+            // includeMemRTree is false).
+            if (!includeMemRTree) {
+                localSearcherRefCount = searcherRefCount;
+                localSearcherRefCount.incrementAndGet();
             }
         }
 
-        int numDiskTrees = onDiskRTrees.size();
+        int numDiskTrees = diskRTreesSnapshot.size();
 
         ITreeIndexAccessor[] bTreeAccessors;
         int diskBTreeIx = 0;
-        if (includeMemTree) {
+        if (includeMemRTree) {
             bTreeAccessors = new ITreeIndexAccessor[numDiskTrees + 1];
             bTreeAccessors[0] = ctx.memBTreeAccessor;
             diskBTreeIx++;
@@ -485,7 +341,7 @@
             bTreeAccessors = new ITreeIndexAccessor[numDiskTrees];
         }
 
-        ListIterator<BTree> diskBTreesIter = onDiskBTrees.listIterator();
+        ListIterator<BTree> diskBTreesIter = diskBTreesSnapshot.listIterator();
         while (diskBTreesIter.hasNext()) {
             BTree diskBTree = diskBTreesIter.next();
             bTreeAccessors[diskBTreeIx] = diskBTree.createAccessor();
@@ -494,11 +350,12 @@
 
         LSMRTreeSearchCursor lsmRTreeCursor = (LSMRTreeSearchCursor) cursor;
         LSMRTreeCursorInitialState initialState = new LSMRTreeCursorInitialState(numDiskTrees + 1,
-                rtreeLeafFrameFactory, rtreeInteriorFrameFactory, btreeLeafFrameFactory, btreeCmp, bTreeAccessors, this);
+                rtreeLeafFrameFactory, rtreeInteriorFrameFactory, btreeLeafFrameFactory, btreeCmp, bTreeAccessors,
+                this, includeMemRTree, localSearcherRefCount);
         lsmRTreeCursor.open(initialState, rtreeSearchPred);
 
         int cursorIx = 1;
-        if (includeMemTree) {
+        if (includeMemRTree) {
             ctx.memRTreeAccessor.search(((LSMRTreeSearchCursor) lsmRTreeCursor).getCursor(0), rtreeSearchPred);
             cursorIx = 1;
         } else {
@@ -507,7 +364,7 @@
 
         // Open cursors of on-disk RTrees
         ITreeIndexAccessor[] diskRTreeAccessors = new ITreeIndexAccessor[numDiskTrees];
-        ListIterator<RTree> diskRTreesIter = onDiskRTrees.listIterator();
+        ListIterator<RTree> diskRTreesIter = diskRTreesSnapshot.listIterator();
 
         int diskRTreeIx = 0;
         while (diskRTreesIter.hasNext()) {
@@ -517,11 +374,195 @@
             cursorIx++;
             diskRTreeIx++;
         }
+        return new Pair<List<RTree>, List<BTree>>(diskRTreesSnapshot, diskBTreesSnapshot);
 
     }
 
-    public LinkedList<BTree> getOnDiskBTrees() {
-        return onDiskBTrees;
+    private ITreeIndex createDiskTree(String fileName, TreeFactory diskTreeFactory, boolean createTree)
+            throws HyracksDataException {
+        // Register the new tree file.
+        FileReference file = new FileReference(new File(fileName));
+        // TODO: Delete the file during cleanup.
+        diskBufferCache.createFile(file);
+        int diskTreeFileId = diskFileMapProvider.lookupFileId(file);
+        // TODO: Close the file during cleanup.
+        diskBufferCache.openFile(diskTreeFileId);
+        // Create new tree instance.
+        ITreeIndex diskTree = diskTreeFactory.createIndexInstance(diskTreeFileId);
+        if (createTree) {
+            diskTree.create(diskTreeFileId);
+        }
+        // TODO: Close the tree during cleanup.
+        diskTree.open(diskTreeFileId);
+        return diskTree;
+    }
+
+    @Override
+    public void flush() throws HyracksDataException, TreeIndexException {
+
+        // scan the RTree
+        ITreeIndexAccessor memRTreeAccessor = memRTree.createAccessor();
+        ITreeIndexCursor rtreeScanCursor = memRTreeAccessor.createSearchCursor();
+        SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
+        memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
+
+        String fileName = fileNameManager.getFlushFileName();
+        RTree diskRTree = (RTree) createDiskTree(fileName + "-rtree", diskRTreeFactory, true);
+
+        // BulkLoad the tuples from the in-memory tree into the new disk RTree.
+        IIndexBulkLoadContext rtreeBulkLoadCtx = diskRTree.beginBulkLoad(1.0f);
+
+        try {
+            while (rtreeScanCursor.hasNext()) {
+                rtreeScanCursor.next();
+                ITupleReference frameTuple = rtreeScanCursor.getTuple();
+                diskRTree.bulkLoadAddTuple(frameTuple, rtreeBulkLoadCtx);
+            }
+        } finally {
+            rtreeScanCursor.close();
+        }
+        diskRTree.endBulkLoad(rtreeBulkLoadCtx);
+
+        // scan the BTree
+        ITreeIndexAccessor memBTreeAccessor = memBTree.createAccessor();
+        ITreeIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor();
+        RangePredicate btreeNullPredicate = new RangePredicate(true, null, null, true, true, null, null);
+        memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
+
+        BTree diskBTree = (BTree) createDiskTree(fileName + "-btree", diskBTreeFactory, true);
+
+        // BulkLoad the tuples from the in-memory tree into the new disk BTree.
+        IIndexBulkLoadContext btreeBulkLoadCtx = diskBTree.beginBulkLoad(1.0f);
+        try {
+            while (btreeScanCursor.hasNext()) {
+                btreeScanCursor.next();
+                ITupleReference frameTuple = btreeScanCursor.getTuple();
+                diskBTree.bulkLoadAddTuple(frameTuple, btreeBulkLoadCtx);
+            }
+        } finally {
+            btreeScanCursor.close();
+        }
+        diskBTree.endBulkLoad(btreeBulkLoadCtx);
+
+        resetInMemoryTrees();
+
+        synchronized (diskRTrees) {
+            diskRTrees.addFirst(diskRTree);
+            diskBTrees.addFirst(diskBTree);
+        }
+    }
+
+    @Override
+    public void merge() throws HyracksDataException, TreeIndexException {
+        if (isMerging.get()) {
+            throw new TreeIndexException("Merge already in progress in LSM-RTree. Only one concurrent merge allowed.");
+        }
+        isMerging.set(true);
+
+        // Point to the current searcher ref count, so we can wait for it later
+        // (after we swap the searcher ref count).
+        AtomicInteger localSearcherRefCount = searcherRefCount;
+
+        LSMRTreeOpContext ctx = createOpContext();
+        ITreeIndexCursor cursor = new LSMRTreeSearchCursor();
+        SearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+        // Scan the RTrees, ignoring the in-memory RTree.
+        Pair<List<RTree>, List<BTree>> mergingDiskTreesPair = search(cursor, rtreeSearchPred, ctx, false);
+        List<RTree> mergingDiskRTrees = mergingDiskTreesPair.getFirst();
+        List<BTree> mergingDiskBTrees = mergingDiskTreesPair.getSecond();
+
+        // Bulk load the tuples from all on-disk RTrees into the new RTree.
+        String fileName = fileNameManager.getMergeFileName();
+        RTree mergedRTree = (RTree) createDiskTree(fileName + "-rtree", diskRTreeFactory, true);
+        BTree mergedBTree = (BTree) createDiskTree(fileName + "-btree", diskBTreeFactory, true);
+
+        IIndexBulkLoadContext bulkLoadCtx = mergedRTree.beginBulkLoad(1.0f);
+        try {
+            while (cursor.hasNext()) {
+                cursor.next();
+                ITupleReference frameTuple = cursor.getTuple();
+                mergedRTree.bulkLoadAddTuple(frameTuple, bulkLoadCtx);
+            }
+        } finally {
+            cursor.close();
+        }
+        mergedRTree.endBulkLoad(bulkLoadCtx);
+
+        // Remove the old RTrees and BTrees from the list, and add the new
+        // merged RTree and an empty BTree
+        // Also, swap the searchRefCount.
+        synchronized (diskRTrees) {
+            diskRTrees.removeAll(mergingDiskRTrees);
+            diskRTrees.addLast(mergedRTree);
+
+            diskBTrees.removeAll(mergingDiskBTrees);
+            diskBTrees.addLast(mergedBTree);
+            // Swap the searcher ref count reference, and reset it to zero.
+            if (searcherRefCount == searcherRefCountA) {
+                searcherRefCount = searcherRefCountB;
+            } else {
+                searcherRefCount = searcherRefCountA;
+            }
+            searcherRefCount.set(0);
+        }
+
+        // Wait for all searchers that are still accessing the old on-disk
+        // RTrees and BTrees, then perform the final cleanup of the old RTrees
+        // and BTrees.
+        while (localSearcherRefCount.get() != 0) {
+            try {
+                Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
+            } catch (InterruptedException e) {
+                // Propagate the exception to the caller, so that an appropriate
+                // cleanup action can be taken.
+                throw new HyracksDataException(e);
+            }
+        }
+
+        // Cleanup. At this point we have guaranteed that no searchers are
+        // touching the old on-disk RTrees and BTrees (localSearcherRefCount ==
+        // 0).
+        for (RTree oldRTree : mergingDiskRTrees) {
+            oldRTree.close();
+            FileReference fileRef = diskFileMapProvider.lookupFileName(oldRTree.getFileId());
+            diskBufferCache.closeFile(oldRTree.getFileId());
+            diskBufferCache.deleteFile(oldRTree.getFileId());
+            fileRef.getFile().delete();
+        }
+        for (BTree oldBTree : mergingDiskBTrees) {
+            oldBTree.close();
+            FileReference fileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
+            diskBufferCache.closeFile(oldBTree.getFileId());
+            diskBufferCache.deleteFile(oldBTree.getFileId());
+            fileRef.getFile().delete();
+        }
+        isMerging.set(false);
+
+    }
+
+    public void resetInMemoryTrees() throws HyracksDataException {
+        memFreePageManager.reset();
+        memRTree.create(rtreeFileId);
+        memBTree.create(btreeFileId);
+    }
+
+    public void threadEnter() {
+        threadRefCount++;
+    }
+
+    public void threadExit() throws HyracksDataException, TreeIndexException {
+        synchronized (this) {
+            threadRefCount--;
+            // Check if we've reached or exceeded the maximum number of pages.
+            if (!flushFlag && memFreePageManager.isFull()) {
+                flushFlag = true;
+            }
+            // Flush will only be handled by last exiting thread.
+            if (flushFlag && threadRefCount == 0) {
+                flush();
+                flushFlag = false;
+            }
+        }
     }
 
     private LSMRTreeOpContext createOpContext() {
@@ -582,7 +623,7 @@
         public ITreeIndexCursor createDiskOrderScanCursor() {
             throw new UnsupportedOperationException("DiskOrderScan not supported by LSM-RTree.");
         }
-        
+
         @Override
         public void diskOrderScan(ITreeIndexCursor cursor) throws HyracksDataException {
             throw new UnsupportedOperationException("DiskOrderScan not supported by LSM-RTree.");
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoadContext.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoadContext.java
new file mode 100644
index 0000000..8fefd07
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoadContext.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+
+public class LSMRTreeBulkLoadContext implements IIndexBulkLoadContext {
+    private final RTree rtree;
+    private final BTree btree;
+    private IIndexBulkLoadContext bulkLoadCtx;
+
+    public LSMRTreeBulkLoadContext(RTree rtree, BTree btree) {
+        this.rtree = rtree;
+        this.btree = btree;
+    }
+
+    public void beginBulkLoad(float fillFactor) throws HyracksDataException, TreeIndexException {
+        bulkLoadCtx = rtree.beginBulkLoad(fillFactor);
+    }
+
+    public RTree getRTree() {
+        return rtree;
+    }
+
+    public BTree getBTree() {
+        return btree;
+    }
+
+    public IIndexBulkLoadContext getBulkLoadCtx() {
+        return bulkLoadCtx;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
index 7239f3a..aaca86a 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
@@ -1,5 +1,7 @@
 package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
@@ -15,10 +17,13 @@
     private MultiComparator btreeCmp;
     private LSMRTree lsmRTree;
     private ITreeIndexAccessor[] bTreeAccessors;
+    private final boolean includeMemRTree;
+    private final AtomicInteger searcherRefCount;
 
     public LSMRTreeCursorInitialState(int numberOfTrees, ITreeIndexFrameFactory rtreeLeafFrameFactory,
             ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
-            MultiComparator btreeCmp, ITreeIndexAccessor[] bTreeAccessors, LSMRTree lsmRTree) {
+            MultiComparator btreeCmp, ITreeIndexAccessor[] bTreeAccessors, LSMRTree lsmRTree, boolean includeMemRTree,
+            AtomicInteger searcherRefCount) {
         this.numberOfTrees = numberOfTrees;
         this.rtreeLeafFrameFactory = rtreeLeafFrameFactory;
         this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory;
@@ -26,6 +31,8 @@
         this.btreeCmp = btreeCmp;
         this.lsmRTree = lsmRTree;
         this.bTreeAccessors = bTreeAccessors;
+        this.includeMemRTree = includeMemRTree;
+        this.searcherRefCount = searcherRefCount;
     }
 
     public int getNumberOfTrees() {
@@ -65,4 +72,12 @@
         return lsmRTree;
     }
 
+    public boolean getIncludeMemRTree() {
+        return includeMemRTree;
+    }
+
+    public AtomicInteger getSearcherRefCount() {
+        return searcherRefCount;
+    }
+
 }
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index 4cf18c4..e2642fb 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -2,6 +2,7 @@
 
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
@@ -11,7 +12,7 @@
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeOpContext;
 
-public final class LSMRTreeOpContext {
+public final class LSMRTreeOpContext implements IIndexOpContext {
 
     private RTreeOpContext rtreeOpContext;
     private BTreeOpContext btreeOpContext;
@@ -39,4 +40,9 @@
         }
     }
 
+    @Override
+    public void reset() {
+
+    }
+
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index 98de361..bf9af21 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -1,5 +1,7 @@
 package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
@@ -28,6 +30,8 @@
     private LSMRTree lsmRTree;
     private RangePredicate btreeRangePredicate;
     private ITupleReference frameTuple;
+    private boolean includeMemRTree;
+    private AtomicInteger searcherRefCount;
 
     public LSMRTreeSearchCursor() {
         currentCursror = 0;
@@ -86,25 +90,24 @@
 
     @Override
     public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
-
-        lsmRTree = ((LSMRTreeCursorInitialState) initialState).getLsmRTree();
-        btreeCmp = ((LSMRTreeCursorInitialState) initialState).getBTreeCmp();
-
-        numberOfTrees = ((LSMRTreeCursorInitialState) initialState).getNumberOfTrees();
-
-        diskBTreeAccessors = ((LSMRTreeCursorInitialState) initialState).getBTreeAccessors();
+        LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
+        lsmRTree = lsmInitialState.getLsmRTree();
+        btreeCmp = lsmInitialState.getBTreeCmp();
+        includeMemRTree = lsmInitialState.getIncludeMemRTree();
+        searcherRefCount = lsmInitialState.getSearcherRefCount();
+        numberOfTrees = lsmInitialState.getNumberOfTrees();
+        diskBTreeAccessors = lsmInitialState.getBTreeAccessors();
 
         rtreeCursors = new RTreeSearchCursor[numberOfTrees];
         btreeCursors = new BTreeRangeSearchCursor[numberOfTrees];
 
         for (int i = 0; i < numberOfTrees; i++) {
-            rtreeCursors[i] = new RTreeSearchCursor((IRTreeInteriorFrame) ((LSMRTreeCursorInitialState) initialState)
-                    .getRTreeInteriorFrameFactory().createFrame(),
-                    (IRTreeLeafFrame) ((LSMRTreeCursorInitialState) initialState).getRTreeLeafFrameFactory()
-                            .createFrame());
+            rtreeCursors[i] = new RTreeSearchCursor((IRTreeInteriorFrame) lsmInitialState
+                    .getRTreeInteriorFrameFactory().createFrame(), (IRTreeLeafFrame) lsmInitialState
+                    .getRTreeLeafFrameFactory().createFrame());
 
-            btreeCursors[i] = new BTreeRangeSearchCursor((IBTreeLeafFrame) ((LSMRTreeCursorInitialState) initialState)
-                    .getBTreeLeafFrameFactory().createFrame(), false);
+            btreeCursors[i] = new BTreeRangeSearchCursor((IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory()
+                    .createFrame(), false);
         }
         btreeRangePredicate = new RangePredicate(true, null, null, true, true, btreeCmp, btreeCmp);
     }
@@ -124,10 +127,17 @@
         rtreeCursors = null;
         btreeCursors = null;
 
-        try {
-            lsmRTree.threadExit();
-        } catch (TreeIndexException e) {
-            throw new HyracksDataException(e);
+        // If the in-memory RTree was not included in the search, then we don't
+        // need to synchronize with a flush.
+        if (includeMemRTree) {
+            try {
+                lsmRTree.threadExit();
+            } catch (TreeIndexException e) {
+                throw new HyracksDataException(e);
+            }
+        } else {
+            // Synchronize with ongoing merges.
+            searcherRefCount.decrementAndGet();
         }
     }
 
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/Pair.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/Pair.java
new file mode 100644
index 0000000..46bce8e
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/Pair.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+public class Pair<A, B> {
+    private final A first;
+    private final B second;
+
+    public Pair(A first, B second) {
+        super();
+        this.first = first;
+        this.second = second;
+    }
+
+    public A getFirst() {
+        return first;
+    }
+
+    public B getSecond() {
+        return second;
+    }
+}
\ No newline at end of file