Factored out thread-concurrency code of LSM-Trees into a common harness. Modified LSMBTree and LSMRTree to use the harness.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1104 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 12b47dd..6358e87 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -17,15 +17,11 @@
 
 import java.io.File;
 import java.io.FilenameFilter;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -38,7 +34,9 @@
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
@@ -49,16 +47,19 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileNameManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 public class LSMBTree implements ILSMTree {
     protected final Logger LOGGER = Logger.getLogger(LSMBTree.class.getName());
-    private static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
+    
+    private final LSMHarness lsmHarness;
     
     // In-memory components.
     private final BTree memBTree;
-    private final InMemoryFreePageManager memFreePageManager;    
+    private final InMemoryFreePageManager memFreePageManager;
 
     // On-disk components.
     private final ILSMFileNameManager fileNameManager;
@@ -69,27 +70,14 @@
     private final BTreeFactory bulkLoadBTreeFactory;
     private final IBufferCache diskBufferCache;
     private final IFileMapProvider diskFileMapProvider;
-    private LinkedList<BTree> diskBTrees = new LinkedList<BTree>();
+    // List of BTree instances. Using Object for better sharing via ILSMTree + LSMHarness.
+    private LinkedList<Object> diskBTrees = new LinkedList<Object>();
     
     // Common for in-memory and on-disk components.
     private final ITreeIndexFrameFactory insertLeafFrameFactory;
     private final ITreeIndexFrameFactory deleteLeafFrameFactory;
     private final MultiComparator cmp;
     
-    // For synchronizing all operations with flushes.
-    // Currently, all operations block during a flush.
-    private int threadRefCount;
-    private boolean flushFlag;
-
-    // For synchronizing searchers with a concurrent merge.
-    private AtomicBoolean isMerging = new AtomicBoolean(false);
-    private AtomicInteger searcherRefCountA = new AtomicInteger(0);
-    private AtomicInteger searcherRefCountB = new AtomicInteger(0);
-    // Represents the current number of searcher threads that are operating on
-    // the unmerged on-disk BTrees.
-    // We alternate between searcherRefCountA and searcherRefCountB.
-    private AtomicInteger searcherRefCount = searcherRefCountA;
-    
     public LSMBTree(IBufferCache memBufferCache, InMemoryFreePageManager memFreePageManager,
             ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
             ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMFileNameManager fileNameManager, BTreeFactory diskBTreeFactory,
@@ -104,10 +92,9 @@
         this.diskBTreeFactory = diskBTreeFactory;
         this.bulkLoadBTreeFactory = bulkLoadBTreeFactory;
         this.cmp = cmp;
-        this.diskBTrees = new LinkedList<BTree>();
-        this.threadRefCount = 0;
-        this.flushFlag = false;
+        this.diskBTrees = new LinkedList<Object>();
         this.fileNameManager = fileNameManager;
+        lsmHarness = new LSMHarness(this);
     }
 
     @Override
@@ -154,7 +141,8 @@
 
     @Override
     public void close() throws HyracksDataException {
-        for (BTree btree : diskBTrees) {
+        for (Object o : diskBTrees) {
+            BTree btree = (BTree) o;
             diskBufferCache.closeFile(btree.getFileId());
             btree.close();
         }
@@ -162,37 +150,25 @@
         memBTree.close();
     }
 
-	private void lsmPerformOp(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
-		boolean waitForFlush = true;
-		do {
-		    // Wait for ongoing flush to complete.
-			synchronized (this) {
-				if (!flushFlag) {
-					// Increments threadRefCount, to force a flush to wait for this operation to finish.
-				    // (a flush can only begin once threadRefCount == 0).
-				    threadEnter();
-				    // Proceed with operation.
-					waitForFlush = false;
-				}
-			}
-		} while (waitForFlush);
-		// TODO: This will become much simpler once the BTree supports a true upsert operation.
-		try {
-			ctx.memBTreeAccessor.insert(tuple);
-		} catch (BTreeDuplicateKeyException e) {
-			// Notice that a flush must wait for the current operation to
-			// finish (threadRefCount must reach zero).
-            // TODO: This methods below are very inefficient, we'd rather like
+    @Override
+    public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException, TreeIndexException {
+        LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
+        // TODO: This will become much simpler once the BTree supports a true upsert operation.
+        try {
+            ctx.memBTreeAccessor.insert(tuple);
+        } catch (BTreeDuplicateKeyException e) {
+            // Notice that a flush must wait for the current operation to
+            // finish (threadRefCount must reach zero).
+            // TODO: The methods below are very inefficient, we'd rather like
             // to flip the antimatter bit one single BTree traversal.
-		    if (ctx.getIndexOp() == IndexOp.DELETE) {
-		        deleteExistingKey(tuple, ctx);
-		    } else {
-		        insertOrUpdateExistingKey(tuple, ctx);
-		    }
-		}
-		threadExit();
-	}
-
+            if (ctx.getIndexOp() == IndexOp.DELETE) {
+                deleteExistingKey(tuple, ctx);
+            } else {
+                insertOrUpdateExistingKey(tuple, ctx);
+            }
+        }
+    }
+    
 	private void deleteExistingKey(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
         // We assume that tuple given by the user for deletion only contains the
         // key fields, but not any non-key fields.
@@ -218,7 +194,7 @@
             // There is a remote chance of livelocks due to this behavior.
             if (tupleCopy == null) {
                 ctx.reset(IndexOp.DELETE);
-                lsmPerformOp(tuple, ctx);
+                lsmHarness.insertUpdateOrDelete(tuple, ctx);
                 return;
             }
             memBTreeUpdate(tupleCopy, ctx);
@@ -259,7 +235,7 @@
         }
         // Restart performOp to insert the tuple.
         ctx.reset(originalOp);
-        lsmPerformOp(tuple, ctx);
+        lsmHarness.insertUpdateOrDelete(tuple, ctx);
     }
     
     private void memBTreeUpdate(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
@@ -273,34 +249,12 @@
             // Simply restart the operation. There is a remote chance of
             // livelocks due to this behavior.
             ctx.reset(originalOp);
-            lsmPerformOp(tuple, ctx);
-        }
-    }
-    
-    public void threadEnter() {
-        threadRefCount++;
-    }
-    
-    public void threadExit() throws HyracksDataException, TreeIndexException {
-        synchronized (this) {
-            threadRefCount--;
-            // Check if we've reached or exceeded the maximum number of pages.
-            if (!flushFlag && memFreePageManager.isFull()) {
-                flushFlag = true;
-            }
-            // Flush will only be handled by last exiting thread.
-            if (flushFlag && threadRefCount == 0) {
-                flush();
-                flushFlag = false;
-            }
+            lsmHarness.insertUpdateOrDelete(tuple, ctx);
         }
     }
 
     @Override
-    public void flush() throws HyracksDataException, TreeIndexException {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Flushing LSM-BTree.");
-        }
+    public ITreeIndex flush() throws HyracksDataException, TreeIndexException {
         // Bulk load a new on-disk BTree from the in-memory BTree.        
         RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
         ITreeIndexAccessor memBTreeAccessor = memBTree.createAccessor();
@@ -318,13 +272,16 @@
             scanCursor.close();
         }
         diskBTree.endBulkLoad(bulkLoadCtx);
-        resetMemBTree();
-        synchronized (diskBTrees) {
-            diskBTrees.addFirst(diskBTree);
-        }
+        return diskBTree;
     }
 
-    private void resetMemBTree() throws HyracksDataException {
+    @Override
+    public void addFlushedComponent(Object index) {
+        diskBTrees.addFirst(index);
+    }
+    
+    @Override
+    public void resetInMemoryComponent() throws HyracksDataException {
         memFreePageManager.reset();
         memBTree.create(memBTree.getFileId());
     }
@@ -364,50 +321,17 @@
         return diskBTree;
     }
     
-    private List<BTree> search(ITreeIndexCursor cursor, RangePredicate pred, LSMBTreeOpContext ctx, boolean includeMemBTree) throws HyracksDataException, TreeIndexException {                
-        // If the search doesn't include the in-memory BTree, then we don't have
-        // to synchronize with a flush.
-        if (includeMemBTree) {
-            boolean waitForFlush = true;
-            do {
-                synchronized (this) {
-                    if (!flushFlag) {
-                        // The corresponding threadExit() is in
-                        // LSMTreeRangeSearchCursor.close().
-                        threadEnter();
-                        waitForFlush = false;
-                    }
-                }
-            } while (waitForFlush);
-        }
-
-        // Get a snapshot of the current on-disk BTrees.
-        // If includeMemBTree is true, then no concurrent
-        // flush can add another on-disk BTree (due to threadEnter());
-        // If includeMemBTree is false, then it is possible that a concurrent
-        // flush adds another on-disk BTree.
-        // Since this mode is only used for merging trees, it doesn't really
-        // matter if the merge excludes the new on-disk BTree.
-        List<BTree> diskBTreesSnapshot = new ArrayList<BTree>();
-        AtomicInteger localSearcherRefCount = null;
-        synchronized (diskBTrees) {
-            diskBTreesSnapshot.addAll(diskBTrees);
-            // Only remember the search ref count when performing a merge (i.e., includeMemBTree is false).
-            if (!includeMemBTree) {
-                localSearcherRefCount = searcherRefCount;
-                localSearcherRefCount.incrementAndGet();
-            }
-        }
-        
+    public void search(ITreeIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx, boolean includeMemComponent) throws HyracksDataException, TreeIndexException {
+        LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
         LSMBTreeRangeSearchCursor lsmTreeCursor = (LSMBTreeRangeSearchCursor) cursor;
-        int numDiskBTrees = diskBTreesSnapshot.size();
-        int numBTrees = (includeMemBTree) ? numDiskBTrees + 1 : numDiskBTrees;                
+        int numDiskBTrees = diskComponents.size();
+        int numBTrees = (includeMemComponent) ? numDiskBTrees + 1 : numDiskBTrees;                
         LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees,
-                insertLeafFrameFactory, cmp, this, includeMemBTree, localSearcherRefCount);
+                insertLeafFrameFactory, cmp, includeMemComponent, lsmHarness);
         lsmTreeCursor.open(initialState, pred);
         
         int cursorIx;
-        if (includeMemBTree) {
+        if (includeMemComponent) {
             // Open cursor of in-memory BTree at index 0.
             ctx.memBTreeAccessor.search(lsmTreeCursor.getCursor(0), pred);
             // Skip 0 because it is the in-memory BTree.
@@ -419,46 +343,27 @@
         // Open cursors of on-disk BTrees.
         ITreeIndexAccessor[] diskBTreeAccessors = new ITreeIndexAccessor[numDiskBTrees];
         int diskBTreeIx = 0;
-        ListIterator<BTree> diskBTreesIter = diskBTreesSnapshot.listIterator();
+        ListIterator<Object> diskBTreesIter = diskComponents.listIterator();
         while(diskBTreesIter.hasNext()) {
-            BTree diskBTree = diskBTreesIter.next();
+            BTree diskBTree = (BTree) diskBTreesIter.next();
             diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor();
             diskBTreeAccessors[diskBTreeIx].search(lsmTreeCursor.getCursor(cursorIx), pred);
             cursorIx++;
             diskBTreeIx++;
         }
         lsmTreeCursor.initPriorityQueue();
-        return diskBTreesSnapshot;
     }
     
-    private void insert(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
-        lsmPerformOp(tuple, ctx);
-    }
-
-    private void delete(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
-        lsmPerformOp(tuple, ctx);
-    }
-
-    public void merge() throws HyracksDataException, TreeIndexException  {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Merging LSM-BTree.");
-        }
-        if (!isMerging.compareAndSet(false, true)) {
-            throw new TreeIndexException("Merge already in progress in LSMBTree. Only one concurrent merge allowed.");
-        }
-        
-        // Point to the current searcher ref count, so we can wait for it later
-        // (after we swap the searcher ref count).
-        AtomicInteger localSearcherRefCount = searcherRefCount;
-        
+    public ITreeIndex merge(List<Object> mergedComponents) throws HyracksDataException, TreeIndexException  {
         LSMBTreeOpContext ctx = createOpContext();
         ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor();
         RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
         // Ordered scan, ignoring the in-memory BTree.
         // We get back a snapshot of the on-disk BTrees that are going to be
         // merged now, so we can clean them up after the merge has completed.
-        List<BTree> mergingDiskBTrees = search(cursor, (RangePredicate) rangePred, ctx, false);
-
+        List<Object> mergingDiskBTrees = lsmHarness.search(cursor, (RangePredicate) rangePred, ctx, false);
+        mergedComponents.addAll(mergingDiskBTrees);
+        
         // Bulk load the tuples from all on-disk BTrees into the new BTree.
         BTree mergedBTree = createMergeTargetBTree();
         IIndexBulkLoadContext bulkLoadCtx = mergedBTree.beginBulkLoad(1.0f);
@@ -472,42 +377,34 @@
             cursor.close();
         }
         mergedBTree.endBulkLoad(bulkLoadCtx);
-        
-        // Remove the old BTrees from the list, and add the new merged BTree.
-        // Also, swap the searchRefCount.
-        synchronized (diskBTrees) {
-            diskBTrees.removeAll(mergingDiskBTrees);
-            diskBTrees.addLast(mergedBTree);
-            // Swap the searcher ref count reference, and reset it to zero.
-            if (searcherRefCount == searcherRefCountA) {
-                searcherRefCount = searcherRefCountB;
-            } else {
-                searcherRefCount = searcherRefCountA;
-            }
-            searcherRefCount.set(0);
-        }
-        
-        // Wait for all searchers that are still accessing the old on-disk
-        // BTrees, then perform the final cleanup of the old BTrees.
-        while (localSearcherRefCount.get() != 0) {
-            try {
-                Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
-            } catch (InterruptedException e) {
-                // Propagate the exception to the caller, so that an appropriate
-                // cleanup action can be taken.
-                throw new HyracksDataException(e);
-            }
-        }
-        
-        // Cleanup. At this point we have guaranteed that no searchers are
-        // touching the old on-disk BTrees (localSearcherRefCount == 0).
-        for (BTree oldBTree : mergingDiskBTrees) {            
+        return mergedBTree;
+    }
+    
+    @Override
+    public void addMergedComponent(Object newComponent, List<Object> mergedComponents) {
+        diskBTrees.removeAll(mergedComponents);
+        diskBTrees.addLast(newComponent);
+    }
+    
+    @Override
+    public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException {
+        for (Object o : mergedComponents) {
+            BTree oldBTree = (BTree) o;
             FileReference fileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
             diskBufferCache.closeFile(oldBTree.getFileId());
             oldBTree.close();
             fileRef.getFile().delete();
         }
-        isMerging.set(false);
+    }
+    
+    @Override
+    public InMemoryFreePageManager getInMemoryFreePageManager() {
+        return (InMemoryFreePageManager) memBTree.getFreePageManager();
+    }
+
+    @Override
+    public List<Object> getDiskComponents() {
+        return diskBTrees;
     }
     
     public class LSMTreeBulkLoadContext implements IIndexBulkLoadContext {
@@ -548,10 +445,8 @@
     @Override
     public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
         LSMTreeBulkLoadContext bulkLoadCtx = (LSMTreeBulkLoadContext) ictx;
-        bulkLoadCtx.getBTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());
-        synchronized (diskBTrees) {
-            diskBTrees.addFirst(bulkLoadCtx.getBTree());
-        }
+        bulkLoadCtx.getBTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());        
+        lsmHarness.addBulkLoadedComponent(bulkLoadCtx.getBTree());
     }
 
     @Override
@@ -584,6 +479,11 @@
         return memBTree.getIndexType();
     }
 
+    @Override
+    public int getFileId() {
+        return memBTree.getFileId();
+    }
+    
     public MultiComparator getMultiComparator() {
         return cmp;
     }
@@ -594,59 +494,17 @@
 
     @Override
     public ITreeIndexAccessor createAccessor() {
-        return new LSMTreeIndexAccessor(this);
+        return new LSMBTreeIndexAccessor(lsmHarness, createOpContext());
     }
-
-    private class LSMTreeIndexAccessor implements ITreeIndexAccessor {
-        private LSMBTree lsmTree;
-        private LSMBTreeOpContext ctx;
-
-        public LSMTreeIndexAccessor(LSMBTree lsmTree) {
-            this.lsmTree = lsmTree;
-            this.ctx = lsmTree.createOpContext();
-        }
-
-        @Override
-        public void insert(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
-            ctx.reset(IndexOp.INSERT);
-            lsmTree.insert(tuple, ctx);
-        }
-
-        @Override
-        public void update(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
-            // Update is the same as insert.
-            ctx.reset(IndexOp.INSERT);
-            insert(tuple);
-        }
-
-        @Override
-        public void delete(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
-            ctx.reset(IndexOp.DELETE);
-            lsmTree.delete(tuple, ctx);            
+    
+    private class LSMBTreeIndexAccessor extends LSMTreeIndexAccessor {
+        public LSMBTreeIndexAccessor(LSMHarness lsmHarness, IIndexOpContext ctx) {
+            super(lsmHarness, ctx);
         }
 
         @Override
         public ITreeIndexCursor createSearchCursor() {
             return new LSMBTreeRangeSearchCursor();
         }
-        
-        @Override
-        public void search(ITreeIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException,
-                TreeIndexException {
-            ctx.reset(IndexOp.SEARCH);
-            lsmTree.search(cursor, (RangePredicate) searchPred, ctx, true);
-        }
-
-        @Override
-        public ITreeIndexCursor createDiskOrderScanCursor() {
-            // Disk-order scan doesn't make sense for the LSMBTree because it cannot correctly resolve deleted tuples.
-            throw new UnsupportedOperationException("DiskOrderScan not supported by LSMTree.");
-        }
-        
-        @Override
-        public void diskOrderScan(ITreeIndexCursor cursor) throws HyracksDataException {
-            // Disk-order scan doesn't make sense for the LSMBTree because it cannot correctly resolve deleted tuples.
-            throw new UnsupportedOperationException("DiskOrderScan not supported by LSMTree.");
-        }
     }
 }
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
index 44806c7..22b85dd 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
@@ -15,11 +15,10 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
 
 public class LSMBTreeCursorInitialState implements ICursorInitialState {
@@ -27,18 +26,16 @@
 	private final int numBTrees;
 	private final ITreeIndexFrameFactory leafFrameFactory;
 	private final MultiComparator cmp;
-	private final LSMBTree lsmBTree;
 	private final boolean includeMemBTree;
-	private final AtomicInteger searcherRefCount;
+	private final LSMHarness lsmHarness;
 	
     public LSMBTreeCursorInitialState(int numBTrees, ITreeIndexFrameFactory leafFrameFactory, MultiComparator cmp,
-            LSMBTree lsmBTree, boolean includeMemBTree, AtomicInteger searcherRefCount) {
+            boolean includeMemBTree, LSMHarness lsmHarness) {
         this.numBTrees = numBTrees;
         this.leafFrameFactory = leafFrameFactory;
         this.cmp = cmp;
-        this.lsmBTree = lsmBTree;
         this.includeMemBTree = includeMemBTree;
-        this.searcherRefCount = searcherRefCount;
+        this.lsmHarness = lsmHarness;
     }
 	
 	public int getNumBTrees() {
@@ -62,15 +59,11 @@
 	public void setPage(ICachedPage page) {
 	}
 
-	public LSMBTree getLsm() {
-		return lsmBTree;
-	}
-	
 	public boolean getIncludeMemBTree() {
 	    return includeMemBTree;
 	}
 	
-	public AtomicInteger getSearcherRefCount() {
-	    return searcherRefCount;
+	public LSMHarness getLSMHarness() {
+	    return lsmHarness;
 	}
 }
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 15c81cc..46f563f 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -17,7 +17,6 @@
 
 import java.util.Comparator;
 import java.util.PriorityQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -26,9 +25,9 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
 
@@ -40,9 +39,8 @@
     private PriorityQueueElement outputElement;
     private PriorityQueueElement reusedElement;
     private boolean needPush;
-    private LSMBTree lsmTree;
     private boolean includeMemBTree;
-    private AtomicInteger searcherRefCount;
+    private LSMHarness lsmHarness;
 
     public LSMBTreeRangeSearchCursor() {
         outputElement = null;
@@ -91,7 +89,6 @@
     @Override
     public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
         LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
-        lsmTree = lsmInitialState.getLsm();
         cmp = lsmInitialState.getCmp();
         int numBTrees = lsmInitialState.getNumBTrees();
         rangeCursors = new BTreeRangeSearchCursor[numBTrees];
@@ -100,7 +97,7 @@
             rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
         }
         includeMemBTree = lsmInitialState.getIncludeMemBTree();
-        searcherRefCount = lsmInitialState.getSearcherRefCount();
+        lsmHarness = lsmInitialState.getLSMHarness();
         setPriorityQueueComparator();
     }
     
@@ -118,22 +115,14 @@
 
     @Override
     public void close() throws HyracksDataException {
-        outputPriorityQueue.clear();
-        for (int i = 0; i < rangeCursors.length; i++) {
-            rangeCursors[i].close();
-        }
-        rangeCursors = null;
-        // If the in-memory BTree was not included in the search, then we don't
-        // need to synchronize with a flush.
-        if (includeMemBTree) {
-            try {
-                lsmTree.threadExit();
-            } catch (TreeIndexException e) {
-                throw new HyracksDataException(e);
+        try {
+            outputPriorityQueue.clear();
+            for (int i = 0; i < rangeCursors.length; i++) {
+                rangeCursors[i].close();
             }
-        } else {
-            // Synchronize with ongoing merges.
-            searcherRefCount.decrementAndGet();
+            rangeCursors = null;
+        } finally {
+            lsmHarness.closeSearchCursor(includeMemBTree);
         }
     }
     
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
index 94dc6b3..69068e2 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
@@ -15,12 +15,46 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
 
+/**
+ * Methods to be implemented by an LSM index, which are called from LSMHarness.
+ * The implementations of the methods below should be thread agnostic.
+ * Synchronization of LSM operations like updates/searches/flushes/merges are
+ * done by the LSMHarness. For example, a flush() implementation should only
+ * create and return the new on-disk component, ignoring the fact that
+ * concurrent searches/updates/merges may be ongoing.
+ * 
+ */
 public interface ILSMTree extends ITreeIndex {
-    public void merge() throws HyracksDataException, TreeIndexException;
+    public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
+            TreeIndexException;
 
-    public void flush() throws HyracksDataException, TreeIndexException;
+    public void search(ITreeIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred,
+            IIndexOpContext ictx, boolean includeMemComponent) throws HyracksDataException, TreeIndexException;
+
+    public Object merge(List<Object> mergedComponents) throws HyracksDataException, TreeIndexException;
+
+    public void addMergedComponent(Object newComponent, List<Object> mergedComponents);
+
+    public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException;
+
+    public Object flush() throws HyracksDataException, TreeIndexException;
+
+    public void addFlushedComponent(Object index);
+
+    public InMemoryFreePageManager getInMemoryFreePageManager();
+
+    public void resetInMemoryComponent() throws HyracksDataException;
+
+    public List<Object> getDiskComponents();
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTreeIndexAccessor.java
new file mode 100644
index 0000000..0289dd6
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTreeIndexAccessor.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+
+/**
+ * Client handle for performing operations
+ * (insert/delete/update/search/diskorderscan/merge/flush) on an ILSMTree. An
+ * ILSMTreeIndexAccessor is not thread safe, but different ILSMTreeIndexAccessor
+ * can concurrently operate on the same ILSMTree (i.e., the ILSMTree must allow
+ * concurrent operations).
+ */
+public interface ILSMTreeIndexAccessor extends ITreeIndexAccessor {
+	/**
+	 * Force a flush of the in-memory component.
+	 * 
+	 * @throws HyracksDataException
+	 * @throws TreeIndexException
+	 */
+	public void flush() throws HyracksDataException, TreeIndexException;
+
+	/**
+	 * Merge all on-disk components.
+	 * 
+	 * @throws HyracksDataException
+	 * @throws TreeIndexException
+	 */
+	public void merge() throws HyracksDataException, TreeIndexException;
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
new file mode 100644
index 0000000..1954fd2
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
+
+/**
+ * Common code for synchronizing LSM operations like
+ * updates/searches/flushes/merges on any ILSMTree. This class only deals with
+ * synchronizing LSM operations, and delegates the concrete implementations of
+ * actual operations to ILSMTree (passed in the c'tor).
+ * 
+ */
+public class LSMHarness {
+	protected final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
+	protected static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
+	
+	private ILSMTree lsmTree;	        
+    
+	// All accesses to the LSM-Tree's on-disk components are synchronized on diskComponentsSync.
+	private Object diskComponentsSync = new Object();
+	
+    // For synchronizing all operations with flushes.
+    // Currently, all operations block during a flush.
+    private int threadRefCount;
+    private boolean flushFlag;
+
+    // For synchronizing searchers with a concurrent merge.
+    private AtomicBoolean isMerging = new AtomicBoolean(false);
+    private AtomicInteger searcherRefCountA = new AtomicInteger(0);
+    private AtomicInteger searcherRefCountB = new AtomicInteger(0);
+    // Represents the current number of searcher threads that are operating on
+    // the unmerged on-disk BTrees.
+    // We alternate between searcherRefCountA and searcherRefCountB.
+    private AtomicInteger searcherRefCount = searcherRefCountA;
+    
+    public LSMHarness(ILSMTree lsmTree) {
+    	this.lsmTree = lsmTree;
+        this.threadRefCount = 0;
+        this.flushFlag = false;
+    }
+
+    public void threadEnter() {
+        threadRefCount++;
+    }
+    
+    public void threadExit() throws HyracksDataException, TreeIndexException {
+        synchronized (this) {
+            threadRefCount--;
+            // Check if we've reached or exceeded the maximum number of pages.
+            if (!flushFlag && lsmTree.getInMemoryFreePageManager().isFull()) {
+                flushFlag = true;
+            }
+            // Flush will only be handled by last exiting thread.
+            if (flushFlag && threadRefCount == 0) {
+                flush();
+                flushFlag = false;
+            }
+        }
+    }
+    
+	public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ctx) throws HyracksDataException, TreeIndexException {
+		boolean waitForFlush = true;
+		do {
+		    // Wait for ongoing flush to complete.
+			synchronized (this) {
+				if (!flushFlag) {
+					// Increments threadRefCount, to force a flush to wait for this operation to finish.
+				    // (a flush can only begin once threadRefCount == 0).
+				    threadEnter();
+				    // Proceed with operation.
+					waitForFlush = false;
+				}
+			}
+		} while (waitForFlush);
+		try {
+			lsmTree.insertUpdateOrDelete(tuple, ctx);
+		} finally {
+			threadExit();
+		}
+	}
+
+    public void flush() throws HyracksDataException, TreeIndexException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Flushing LSM-Tree.");
+        }
+        Object newDiskComponent = lsmTree.flush();
+        lsmTree.resetInMemoryComponent();
+        synchronized (diskComponentsSync) {
+            lsmTree.addFlushedComponent(newDiskComponent);
+        }
+    }
+    
+    public List<Object> search(ITreeIndexCursor cursor, ISearchPredicate pred, IIndexOpContext ctx, boolean includeMemComponent) throws HyracksDataException, TreeIndexException {                
+        // If the search doesn't include the in-memory component, then we don't have
+        // to synchronize with a flush.
+        if (includeMemComponent) {
+            boolean waitForFlush = true;
+            do {
+                synchronized (this) {
+                    if (!flushFlag) {
+                        // The corresponding threadExit() is in
+                        // LSMTreeRangeSearchCursor.close().
+                        threadEnter();
+                        waitForFlush = false;
+                    }
+                }
+            } while (waitForFlush);
+        }
+
+        // Get a snapshot of the current on-disk BTrees.
+        // If includeMemBTree is true, then no concurrent
+        // flush can add another on-disk BTree (due to threadEnter());
+        // If includeMemBTree is false, then it is possible that a concurrent
+        // flush adds another on-disk BTree.
+        // Since this mode is only used for merging trees, it doesn't really
+        // matter if the merge excludes the new on-disk BTree.
+        List<Object> diskComponentSnapshot = new ArrayList<Object>();
+        AtomicInteger localSearcherRefCount = null;
+        synchronized (diskComponentsSync) {
+        	diskComponentSnapshot.addAll(lsmTree.getDiskComponents());
+            // Only remember the search ref count when performing a merge (i.e., includeMemComponent is false).
+            if (!includeMemComponent) {
+                localSearcherRefCount = searcherRefCount;
+                localSearcherRefCount.incrementAndGet();
+            }
+        }
+        
+        lsmTree.search(cursor, diskComponentSnapshot, pred, ctx, includeMemComponent);
+        return diskComponentSnapshot;
+    }
+
+    public void merge() throws HyracksDataException, TreeIndexException  {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Merging LSM-BTree.");
+        }
+        if (!isMerging.compareAndSet(false, true)) {
+            throw new TreeIndexException("Merge already in progress in LSMBTree. Only one concurrent merge allowed.");
+        }
+        
+        // Point to the current searcher ref count, so we can wait for it later
+        // (after we swap the searcher ref count).
+        AtomicInteger localSearcherRefCount = searcherRefCount;
+        
+        List<Object> mergedComponents = new ArrayList<Object>();
+        Object newComponent = lsmTree.merge(mergedComponents);
+        
+        // Remove the old BTrees from the list, and add the new merged BTree.
+        // Also, swap the searchRefCount.
+        synchronized (diskComponentsSync) {
+        	lsmTree.addMergedComponent(newComponent, mergedComponents);
+            // Swap the searcher ref count reference, and reset it to zero.
+            if (searcherRefCount == searcherRefCountA) {
+                searcherRefCount = searcherRefCountB;
+            } else {
+                searcherRefCount = searcherRefCountA;
+            }
+            searcherRefCount.set(0);
+        }
+        
+        // Wait for all searchers that are still accessing the old on-disk
+        // BTrees, then perform the final cleanup of the old BTrees.
+        while (localSearcherRefCount.get() != 0) {
+            try {
+                Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
+            } catch (InterruptedException e) {
+                // Propagate the exception to the caller, so that an appropriate
+                // cleanup action can be taken.
+                throw new HyracksDataException(e);
+            }
+        }
+        
+        // Cleanup. At this point we have guaranteed that no searchers are
+        // touching the old on-disk BTrees (localSearcherRefCount == 0).
+        lsmTree.cleanUpAfterMerge(mergedComponents);
+        isMerging.set(false);
+    }
+    
+    public AtomicInteger getSearcherRefCount() {
+    	return searcherRefCount;
+    }
+    
+    public void closeSearchCursor(boolean includeMemComponent) throws HyracksDataException {
+        // If the in-memory BTree was not included in the search, then we don't
+        // need to synchronize with a flush.
+        if (includeMemComponent) {
+            try {
+                threadExit();
+            } catch (TreeIndexException e) {
+                throw new HyracksDataException(e);
+            }
+        } else {
+            // Synchronize with ongoing merges.
+            searcherRefCount.decrementAndGet();
+        }
+    }
+    
+    public void addBulkLoadedComponent(Object index) {
+    	synchronized (diskComponentsSync) {
+    		lsmTree.addFlushedComponent(index);
+    	}
+    }
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTree.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTree.java
deleted file mode 100644
index ca780b7..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTree.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.io.File;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileNameManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-
-public abstract class LSMTree implements ILSMTree {
-    protected static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
-
-    // In-memory components.
-    protected final BTree memBTree;
-    protected final InMemoryFreePageManager memFreePageManager;
-
-    // On-disk components.
-    protected final ILSMFileNameManager fileNameManager;
-    // For creating BTree's used in flush and merge.
-    protected final BTreeFactory diskBTreeFactory;
-
-    protected final IBufferCache diskBufferCache;
-    protected final IFileMapProvider diskFileMapProvider;
-    protected LinkedList<BTree> diskBTrees = new LinkedList<BTree>();
-
-    protected final MultiComparator cmp;
-
-    // For synchronizing all operations with flushes.
-    // Currently, all operations block during a flush.
-    private int threadRefCount;
-    protected boolean flushFlag;
-
-    // For synchronizing searchers with a concurrent merge.
-    protected AtomicBoolean isMerging = new AtomicBoolean(false);
-    protected AtomicInteger searcherRefCountA = new AtomicInteger(0);
-    protected AtomicInteger searcherRefCountB = new AtomicInteger(0);
-    // Represents the current number of searcher threads that are operating on
-    // the unmerged on-disk BTrees.
-    // We alternate between searcherRefCountA and searcherRefCountB.
-    protected AtomicInteger searcherRefCount = searcherRefCountA;
-
-    public LSMTree(IBufferCache memBufferCache, InMemoryFreePageManager memFreePageManager,
-            ITreeIndexFrameFactory btreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
-            ILSMFileNameManager fileNameManager, BTreeFactory diskBTreeFactory, IFileMapProvider diskFileMapProvider,
-            int fieldCount, MultiComparator cmp) {
-        memBTree = new BTree(memBufferCache, fieldCount, cmp, memFreePageManager, btreeInteriorFrameFactory,
-                btreeLeafFrameFactory);
-        this.memFreePageManager = memFreePageManager;
-        this.diskBufferCache = diskBTreeFactory.getBufferCache();
-        this.diskFileMapProvider = diskFileMapProvider;
-        this.diskBTreeFactory = diskBTreeFactory;
-        this.cmp = cmp;
-        this.diskBTrees = new LinkedList<BTree>();
-        this.threadRefCount = 0;
-        this.flushFlag = false;
-        this.fileNameManager = fileNameManager;
-    }
-
-    @Override
-    public void create(int indexFileId) throws HyracksDataException {
-        memBTree.create(indexFileId);
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        for (BTree btree : diskBTrees) {
-            diskBufferCache.closeFile(btree.getFileId());
-            btree.close();
-        }
-        diskBTrees.clear();
-        memBTree.close();
-    }
-
-    public void threadEnter() {
-        threadRefCount++;
-    }
-
-    public void threadExit() throws HyracksDataException, TreeIndexException {
-        synchronized (this) {
-            threadRefCount--;
-            // Check if we've reached or exceeded the maximum number of pages.
-            if (!flushFlag && memFreePageManager.isFull()) {
-                flushFlag = true;
-            }
-            // Flush will only be handled by last exiting thread.
-            if (flushFlag && threadRefCount == 0) {
-                flush();
-                flushFlag = false;
-            }
-        }
-    }
-
-    protected void cleanupTrees(List<ITreeIndex> mergingDiskTrees) throws HyracksDataException {
-        for (ITreeIndex oldTree : mergingDiskTrees) {
-            FileReference fileRef = diskFileMapProvider.lookupFileName(oldTree.getFileId());
-            diskBufferCache.closeFile(oldTree.getFileId());
-            oldTree.close();
-            fileRef.getFile().delete();
-        }
-    }
-    
-    protected void resetMemBTree() throws HyracksDataException {
-        memFreePageManager.reset();
-        memBTree.create(memBTree.getFileId());
-    }
-
-    protected ITreeIndex createFlushTargetTree(String fileName) throws HyracksDataException {
-        return createDiskTree(diskBTreeFactory, fileName, true);
-    }
-
-    protected ITreeIndex createMergeTargetTree(String fileName) throws HyracksDataException {
-        return createDiskTree(diskBTreeFactory, fileName, true);
-    }
-
-    protected ITreeIndex createDiskTree(TreeFactory diskTreeFactory, String fileName, boolean createTree)
-            throws HyracksDataException {
-        // Register the new tree file.
-        FileReference file = new FileReference(new File(fileName));
-        // File will be deleted during cleanup of merge().
-        diskBufferCache.createFile(file);
-        int diskTreeFileId = diskFileMapProvider.lookupFileId(file);
-        // File will be closed during cleanup of merge().
-        diskBufferCache.openFile(diskTreeFileId);
-        // Create new tree instance.
-        ITreeIndex diskTree = diskTreeFactory.createIndexInstance(diskTreeFileId);
-        if (createTree) {
-            diskTree.create(diskTreeFileId);
-        }
-        // Tree will be closed during cleanup of merge().
-        diskTree.open(diskTreeFileId);
-        return diskTree;
-    }
-
-    @Override
-    public abstract void flush() throws HyracksDataException, TreeIndexException;
-
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
new file mode 100644
index 0000000..6d9cba3
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTreeIndexAccessor;
+
+public abstract class LSMTreeIndexAccessor implements ILSMTreeIndexAccessor {
+	private LSMHarness lsmHarness;
+	private IIndexOpContext ctx;
+
+	public LSMTreeIndexAccessor(LSMHarness lsmHarness, IIndexOpContext ctx) {
+		this.lsmHarness = lsmHarness;
+		this.ctx = ctx;
+	}
+
+	@Override
+	public void insert(ITupleReference tuple) throws HyracksDataException,
+			TreeIndexException {
+		ctx.reset(IndexOp.INSERT);
+		lsmHarness.insertUpdateOrDelete(tuple, ctx);
+	}
+
+	@Override
+	public void update(ITupleReference tuple) throws HyracksDataException,
+			TreeIndexException {
+		// Update is the same as insert.
+		ctx.reset(IndexOp.INSERT);
+		lsmHarness.insertUpdateOrDelete(tuple, ctx);
+	}
+
+	@Override
+	public void delete(ITupleReference tuple) throws HyracksDataException,
+			TreeIndexException {
+		ctx.reset(IndexOp.DELETE);
+		lsmHarness.insertUpdateOrDelete(tuple, ctx);
+	}
+	
+	@Override
+	public void search(ITreeIndexCursor cursor, ISearchPredicate searchPred)
+			throws HyracksDataException, TreeIndexException {
+		ctx.reset(IndexOp.SEARCH);
+		lsmHarness.search(cursor, searchPred, ctx, true);
+	}
+
+	@Override
+	public ITreeIndexCursor createDiskOrderScanCursor() {
+		// Disk-order scan doesn't make sense for the LSMBTree because it cannot
+		// correctly resolve deleted tuples.
+		throw new UnsupportedOperationException(
+				"DiskOrderScan not supported by LSMTree.");
+	}
+
+	@Override
+	public void diskOrderScan(ITreeIndexCursor cursor)
+			throws HyracksDataException {
+		// Disk-order scan doesn't make sense for the LSMBTree because it cannot
+		// correctly resolve deleted tuples.
+		throw new UnsupportedOperationException(
+				"DiskOrderScan not supported by LSMTree.");
+	}
+	
+	@Override
+	public void flush() throws HyracksDataException, TreeIndexException {
+		lsmHarness.flush();
+	}
+
+	@Override
+	public void merge() throws HyracksDataException, TreeIndexException {
+		lsmHarness.merge();
+	}
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 8070a8e..57cb36c 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -17,20 +17,20 @@
 
 import java.io.File;
 import java.io.FilenameFilter;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
@@ -41,9 +41,12 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileNameManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTree;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
@@ -51,18 +54,47 @@
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
-public class LSMRTree extends LSMTree {
-
+public class LSMRTree implements ILSMTree {
+	
+	public class LSMRTreeComponent {
+		private final RTree rtree;
+		private final BTree btree;
+		
+		LSMRTreeComponent (RTree rtree, BTree btree) {
+			this.rtree = rtree;
+			this.btree = btree;
+		}
+		
+		public RTree getRTree() {
+			return rtree;
+		}
+		
+		public BTree getBTree() {
+			return btree;
+		}
+	}
+	
+	private final LSMHarness lsmHarness;
+	
     // In-memory components.
-    private final RTree memRTree;
+	private final LSMRTreeComponent memComponent;
+    protected final InMemoryFreePageManager memFreePageManager;
     private final static int MEM_RTREE_FILE_ID = 0;
     private final static int MEM_BTREE_FILE_ID = 1;
 
     // On-disk components.
+    private final ILSMFileNameManager fileNameManager;
+    protected final IBufferCache diskBufferCache;
+    protected final IFileMapProvider diskFileMapProvider;
     // For creating RTree's used in flush and merge.
     private final RTreeFactory diskRTreeFactory;
-    private LinkedList<RTree> diskRTrees = new LinkedList<RTree>();
+    // For creating BTree's used in flush and merge.
+    private final BTreeFactory diskBTreeFactory;
+    // List of LSMRTreeComponent instances. Using Object for better sharing via ILSMTree + LSMHarness.
+    private final LinkedList<Object> diskComponents = new LinkedList<Object>();
 
+    private MultiComparator btreeCmp;
+    
     // Common for in-memory and on-disk components.
     private final ITreeIndexFrameFactory rtreeInteriorFrameFactory;
     private final ITreeIndexFrameFactory btreeInteriorFrameFactory;
@@ -74,24 +106,29 @@
             ITreeIndexFrameFactory btreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
             ILSMFileNameManager fileNameManager, RTreeFactory diskRTreeFactory, BTreeFactory diskBTreeFactory,
             IFileMapProvider diskFileMapProvider, int fieldCount, MultiComparator rtreeCmp, MultiComparator btreeCmp) {
-
-        super(memBufferCache, memFreePageManager, btreeInteriorFrameFactory, btreeLeafFrameFactory, fileNameManager,
-                diskBTreeFactory, diskFileMapProvider, fieldCount, btreeCmp);
-        memRTree = new RTree(memBufferCache, fieldCount, rtreeCmp, memFreePageManager, rtreeInteriorFrameFactory,
+        RTree memRTree = new RTree(memBufferCache, fieldCount, rtreeCmp, memFreePageManager, rtreeInteriorFrameFactory,
                 rtreeLeafFrameFactory);
-
+        BTree memBTree = new BTree(memBufferCache, fieldCount, btreeCmp, memFreePageManager, btreeInteriorFrameFactory,
+                btreeLeafFrameFactory);
+        memComponent = new LSMRTreeComponent(memRTree, memBTree);
+        this.memFreePageManager = memFreePageManager;
+        this.diskBufferCache = diskBTreeFactory.getBufferCache();
+        this.diskFileMapProvider = diskFileMapProvider;
+        this.diskBTreeFactory = diskBTreeFactory;
+        this.fileNameManager = fileNameManager;
         this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory;
         this.rtreeLeafFrameFactory = rtreeLeafFrameFactory;
         this.btreeInteriorFrameFactory = btreeInteriorFrameFactory;
         this.btreeLeafFrameFactory = btreeLeafFrameFactory;
-
         this.diskRTreeFactory = diskRTreeFactory;
+        this.btreeCmp = btreeCmp;
+        this.lsmHarness = new LSMHarness(this);
     }
 
     @Override
     public void create(int indexFileId) throws HyracksDataException {
-        super.create(MEM_BTREE_FILE_ID);
-        memRTree.create(MEM_RTREE_FILE_ID);
+    	memComponent.getRTree().create(MEM_RTREE_FILE_ID);
+    	memComponent.getBTree().create(MEM_BTREE_FILE_ID);
     }
 
     /**
@@ -112,8 +149,8 @@
      */
     @Override
     public void open(int indexFileId) throws HyracksDataException {
-        memRTree.open(MEM_RTREE_FILE_ID);
-        memBTree.open(MEM_BTREE_FILE_ID);
+    	memComponent.getRTree().open(MEM_RTREE_FILE_ID);
+    	memComponent.getBTree().open(MEM_BTREE_FILE_ID);
         File dir = new File(fileNameManager.getBaseDir());
         FilenameFilter rtreeFilter = new FilenameFilter() {
             public boolean accept(File dir, String name) {
@@ -135,34 +172,52 @@
 
         Comparator<String> fileNameCmp = fileNameManager.getFileNameComparator();
         Arrays.sort(rtreeFiles, fileNameCmp);
-        for (String fileName : rtreeFiles) {
-            RTree rtree = (RTree) createDiskTree(diskRTreeFactory, fileName, false);
-            diskRTrees.add(rtree);
-        }
-
         Arrays.sort(btreeFiles, fileNameCmp);
-        for (String fileName : btreeFiles) {
-            BTree btree = (BTree) createDiskTree(diskBTreeFactory, fileName, false);
-            diskBTrees.add(btree);
+        // Assert rtreeFiles.size() == btreeFiles.size() 
+        for (int i = 0; i < rtreeFiles.length; i++) {
+        	RTree rtree = (RTree) createDiskTree(diskRTreeFactory, rtreeFiles[i], false);
+        	BTree btree = (BTree) createDiskTree(diskBTreeFactory, btreeFiles[i], false);
+        	LSMRTreeComponent diskComponent = new LSMRTreeComponent(rtree, btree);
+        	diskComponents.add(diskComponent);
         }
     }
 
     @Override
     public void close() throws HyracksDataException {
-        super.close();
-        for (RTree rtree : diskRTrees) {
+        for (Object o : diskComponents) {
+            LSMRTreeComponent diskComponent = (LSMRTreeComponent) o;
+            RTree rtree = diskComponent.getRTree();
+            BTree btree = diskComponent.getBTree();
             diskBufferCache.closeFile(rtree.getFileId());
             rtree.close();
+            diskBufferCache.closeFile(btree.getFileId());
+            btree.close();
         }
-        diskRTrees.clear();
-        memRTree.close();
+        diskComponents.clear();
+        memComponent.getRTree().close();
+        memComponent.getBTree().close();
     }
 
-    @Override
-    public ITreeIndexAccessor createAccessor() {
-        return new LSMRTreeAccessor(this);
+    // TODO: Candidate for more code sharing.
+    protected ITreeIndex createDiskTree(TreeFactory diskTreeFactory, String fileName, boolean createTree)
+            throws HyracksDataException {
+        // Register the new tree file.
+        FileReference file = new FileReference(new File(fileName));
+        // File will be deleted during cleanup of merge().
+        diskBufferCache.createFile(file);
+        int diskTreeFileId = diskFileMapProvider.lookupFileId(file);
+        // File will be closed during cleanup of merge().
+        diskBufferCache.openFile(diskTreeFileId);
+        // Create new tree instance.
+        ITreeIndex diskTree = diskTreeFactory.createIndexInstance(diskTreeFileId);
+        if (createTree) {
+            diskTree.create(diskTreeFileId);
+        }
+        // Tree will be closed during cleanup of merge().
+        diskTree.open(diskTreeFileId);
+        return diskTree;
     }
-
+    
     @Override
     public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws TreeIndexException, HyracksDataException {
         // Note that by using a flush target file name, we state that the new
@@ -189,10 +244,8 @@
     public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
         LSMRTreeBulkLoadContext bulkLoadCtx = (LSMRTreeBulkLoadContext) ictx;
         bulkLoadCtx.getRTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());
-        synchronized (diskRTrees) {
-            diskRTrees.addFirst(bulkLoadCtx.getRTree());
-            diskBTrees.addFirst(bulkLoadCtx.getBTree());
-        }
+        LSMRTreeComponent diskComponent = new LSMRTreeComponent(bulkLoadCtx.getRTree(), bulkLoadCtx.getBTree());
+        lsmHarness.addBulkLoadedComponent(diskComponent);
     }
 
     @Override
@@ -230,82 +283,24 @@
         return 0;
     }
 
-    private void insertOrDelete(ITupleReference tuple, ITreeIndexAccessor accessor) throws HyracksDataException,
+    public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
             TreeIndexException {
-        boolean waitForFlush = true;
-        do {
-            // Wait for ongoing flush to complete.
-            synchronized (this) {
-                if (!flushFlag) {
-                    // Increments threadRefCount, to force a flush to wait for
-                    // this operation to finish.
-                    // (a flush can only begin once threadRefCount == 0).
-                    threadEnter();
-                    // Proceed with operation.
-                    waitForFlush = false;
-                }
-            }
-        } while (waitForFlush);
-        accessor.insert(tuple);
-        try {
-            threadExit();
-        } catch (Exception e) {
-            e.printStackTrace();
+        LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
+        if (ctx.getIndexOp() == IndexOp.INSERT) {
+            ctx.memRTreeAccessor.insert(tuple);
+        } else {
+            // Assert ctx.getIndexOp() == IndexOp.DELETE
+            ctx.memBTreeAccessor.insert(tuple);
         }
     }
 
-    private void insert(ITupleReference tuple, LSMRTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
-        insertOrDelete(tuple, ctx.memRTreeAccessor);
-    }
-
-    private void delete(ITupleReference tuple, LSMRTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
-        insertOrDelete(tuple, ctx.memBTreeAccessor);
-    }
-
-    private Pair<List<ITreeIndex>, List<ITreeIndex>> search(ITreeIndexCursor cursor, ISearchPredicate rtreeSearchPred,
-            LSMRTreeOpContext ctx, boolean includeMemRTree) throws HyracksDataException, TreeIndexException {
-        // If the search doesn't include the in-memory RTree, then we don't have
-        // to synchronize with a flush.
-        if (includeMemRTree) {
-            boolean waitForFlush = true;
-            do {
-                synchronized (this) {
-                    if (!flushFlag) {
-                        // The corresponding threadExit() is in
-                        // LSMTreeRangeSearchCursor.close().
-                        threadEnter();
-                        waitForFlush = false;
-                    }
-                }
-            } while (waitForFlush);
-        }
-
-        // Get a snapshot of the current on-disk RTrees and BTrees.
-        // If includeMemRTree is true, then no concurrent
-        // flush can add another on-disk RTree (due to threadEnter());
-        // If includeMemRTree is false, then it is possible that a concurrent
-        // flush adds another on-disk RTree.
-        // Since this mode is only used for merging trees, it doesn't really
-        // matter if the merge excludes the new on-disk RTree.
-        List<ITreeIndex> diskRTreesSnapshot = new ArrayList<ITreeIndex>();
-        List<ITreeIndex> diskBTreesSnapshot = new ArrayList<ITreeIndex>();
-        AtomicInteger localSearcherRefCount = null;
-        synchronized (diskRTrees) {
-            diskRTreesSnapshot.addAll(diskRTrees);
-            diskBTreesSnapshot.addAll(diskBTrees);
-            // Only remember the search ref count when performing a merge (i.e.,
-            // includeMemRTree is false).
-            if (!includeMemRTree) {
-                localSearcherRefCount = searcherRefCount;
-                localSearcherRefCount.incrementAndGet();
-            }
-        }
-
-        int numDiskTrees = diskRTreesSnapshot.size();
-
+    public void search(ITreeIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred,
+            IIndexOpContext ictx, boolean includeMemComponent) throws HyracksDataException, TreeIndexException {
+        LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
+        int numDiskTrees = diskComponents.size();
         ITreeIndexAccessor[] bTreeAccessors;
         int diskBTreeIx = 0;
-        if (includeMemRTree) {
+        if (includeMemComponent) {
             bTreeAccessors = new ITreeIndexAccessor[numDiskTrees + 1];
             bTreeAccessors[0] = ctx.memBTreeAccessor;
             diskBTreeIx++;
@@ -313,22 +308,23 @@
             bTreeAccessors = new ITreeIndexAccessor[numDiskTrees];
         }
 
-        ListIterator<ITreeIndex> diskBTreesIter = diskBTreesSnapshot.listIterator();
+        ListIterator<Object> diskBTreesIter = diskComponents.listIterator();
         while (diskBTreesIter.hasNext()) {
-            BTree diskBTree = (BTree) diskBTreesIter.next();
+            LSMRTreeComponent component = (LSMRTreeComponent) diskBTreesIter.next();
+            BTree diskBTree = component.getBTree();
             bTreeAccessors[diskBTreeIx] = diskBTree.createAccessor();
             diskBTreeIx++;
         }
 
         LSMRTreeSearchCursor lsmRTreeCursor = (LSMRTreeSearchCursor) cursor;
         LSMRTreeCursorInitialState initialState = new LSMRTreeCursorInitialState(numDiskTrees + 1,
-                rtreeLeafFrameFactory, rtreeInteriorFrameFactory, btreeLeafFrameFactory, cmp, bTreeAccessors, this,
-                includeMemRTree, localSearcherRefCount);
-        lsmRTreeCursor.open(initialState, rtreeSearchPred);
+                rtreeLeafFrameFactory, rtreeInteriorFrameFactory, btreeLeafFrameFactory, btreeCmp, bTreeAccessors, 
+                includeMemComponent, lsmHarness);
+        lsmRTreeCursor.open(initialState, pred);
 
         int cursorIx = 1;
-        if (includeMemRTree) {
-            ctx.memRTreeAccessor.search(((LSMRTreeSearchCursor) lsmRTreeCursor).getCursor(0), rtreeSearchPred);
+        if (includeMemComponent) {
+            ctx.memRTreeAccessor.search(((LSMRTreeSearchCursor) lsmRTreeCursor).getCursor(0), pred);
             cursorIx = 1;
         } else {
             cursorIx = 0;
@@ -336,25 +332,23 @@
 
         // Open cursors of on-disk RTrees
         ITreeIndexAccessor[] diskRTreeAccessors = new ITreeIndexAccessor[numDiskTrees];
-        ListIterator<ITreeIndex> diskRTreesIter = diskRTreesSnapshot.listIterator();
+        ListIterator<Object> diskRTreesIter = diskComponents.listIterator();
 
         int diskRTreeIx = 0;
         while (diskRTreesIter.hasNext()) {
-            RTree diskRTree = (RTree) diskRTreesIter.next();
+            LSMRTreeComponent component = (LSMRTreeComponent) diskRTreesIter.next();
+            RTree diskRTree = component.getRTree();
             diskRTreeAccessors[diskRTreeIx] = diskRTree.createAccessor();
-            diskRTreeAccessors[diskRTreeIx].search(lsmRTreeCursor.getCursor(cursorIx), rtreeSearchPred);
+            diskRTreeAccessors[diskRTreeIx].search(lsmRTreeCursor.getCursor(cursorIx), pred);
             cursorIx++;
             diskRTreeIx++;
         }
-        return new Pair<List<ITreeIndex>, List<ITreeIndex>>(diskRTreesSnapshot, diskBTreesSnapshot);
-
     }
 
     @Override
-    public void flush() throws HyracksDataException, TreeIndexException {
-
+    public Object flush() throws HyracksDataException, TreeIndexException {
         // scan the memory RTree
-        ITreeIndexAccessor memRTreeAccessor = memRTree.createAccessor();
+        ITreeIndexAccessor memRTreeAccessor = memComponent.getRTree().createAccessor();
         ITreeIndexCursor rtreeScanCursor = memRTreeAccessor.createSearchCursor();
         SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
         memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
@@ -377,7 +371,7 @@
         diskRTree.endBulkLoad(rtreeBulkLoadCtx);
 
         // scan the memory BTree
-        ITreeIndexAccessor memBTreeAccessor = memBTree.createAccessor();
+        ITreeIndexAccessor memBTreeAccessor = memComponent.getBTree().createAccessor();
         ITreeIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor();
         RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
         memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
@@ -396,32 +390,17 @@
             btreeScanCursor.close();
         }
         diskBTree.endBulkLoad(btreeBulkLoadCtx);
-
-        resetMemoryTrees();
-
-        synchronized (diskRTrees) {
-            diskRTrees.addFirst(diskRTree);
-            diskBTrees.addFirst(diskBTree);
-        }
+        return new LSMRTreeComponent(diskRTree, diskBTree);
     }
 
     @Override
-    public void merge() throws HyracksDataException, TreeIndexException {
-        if (!isMerging.compareAndSet(false, true)) {
-            throw new TreeIndexException("Merge already in progress in LSMRTree. Only one concurrent merge allowed.");
-        }
-
-        // Point to the current searcher ref count, so we can wait for it later
-        // (after we swap the searcher ref count).
-        AtomicInteger localSearcherRefCount = searcherRefCount;
-
-        LSMRTreeOpContext ctx = createOpContext();
+    public Object merge(List<Object> mergedComponents) throws HyracksDataException, TreeIndexException {
+        IIndexOpContext ctx = createOpContext();
         ITreeIndexCursor cursor = new LSMRTreeSearchCursor();
-        SearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+        ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
         // Scan the RTrees, ignoring the in-memory RTree.
-        Pair<List<ITreeIndex>, List<ITreeIndex>> mergingDiskTreesPair = search(cursor, rtreeSearchPred, ctx, false);
-        List<ITreeIndex> mergingDiskRTrees = mergingDiskTreesPair.getFirst();
-        List<ITreeIndex> mergingDiskBTrees = mergingDiskTreesPair.getSecond();
+        List<Object> mergingComponents = lsmHarness.search(cursor, rtreeSearchPred, ctx, false);
+        mergedComponents.addAll(mergingComponents);
 
         // Bulk load the tuples from all on-disk RTrees into the new RTree.
         String fileName = fileNameManager.getMergeFileName();
@@ -439,115 +418,76 @@
             cursor.close();
         }
         mergedRTree.endBulkLoad(bulkLoadCtx);
-
-        // Remove the old RTrees and BTrees from the list, and add the new
-        // merged RTree and an empty BTree
-        // Also, swap the searchRefCount.
-        synchronized (diskRTrees) {
-            diskRTrees.removeAll(mergingDiskRTrees);
-            diskRTrees.addLast(mergedRTree);
-
-            diskBTrees.removeAll(mergingDiskBTrees);
-            diskBTrees.addLast(mergedBTree);
-            // Swap the searcher ref count reference, and reset it to zero.
-            if (searcherRefCount == searcherRefCountA) {
-                searcherRefCount = searcherRefCountB;
-            } else {
-                searcherRefCount = searcherRefCountA;
-            }
-            searcherRefCount.set(0);
-        }
-
-        // Wait for all searchers that are still accessing the old on-disk
-        // RTrees and BTrees, then perform the final cleanup of the old RTrees
-        // and BTrees.
-        while (localSearcherRefCount.get() != 0) {
-            try {
-                Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
-            } catch (InterruptedException e) {
-                // Propagate the exception to the caller, so that an appropriate
-                // cleanup action can be taken.
-                throw new HyracksDataException(e);
-            }
-        }
-
-        // Cleanup. At this point we have guaranteed that no searchers are
-        // touching the old on-disk RTrees and BTrees (localSearcherRefCount ==
-        // 0).
-        cleanupTrees(mergingDiskRTrees);
-        cleanupTrees(mergingDiskBTrees);
-        isMerging.set(false);
-
+        return new LSMRTreeComponent(mergedRTree, mergedBTree);
     }
 
-    public void resetMemoryTrees() throws HyracksDataException {
-        resetMemBTree();
-        memRTree.create(MEM_RTREE_FILE_ID);
+    @Override
+    public void addMergedComponent(Object newComponent, List<Object> mergedComponents) {
+        diskComponents.removeAll(mergedComponents);
+        diskComponents.addLast((LSMRTreeComponent) newComponent);
     }
 
+    @Override
+    public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException {        
+        for (Object o : mergedComponents) {
+            LSMRTreeComponent component = (LSMRTreeComponent) o;
+            BTree oldBTree = component.getBTree();            
+            FileReference btreeFileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
+            diskBufferCache.closeFile(oldBTree.getFileId());
+            oldBTree.close();
+            btreeFileRef.getFile().delete();
+            RTree oldRTree = component.getRTree();
+            FileReference rtreeFileRef = diskFileMapProvider.lookupFileName(oldRTree.getFileId());
+            diskBufferCache.closeFile(oldRTree.getFileId());
+            oldRTree.close();
+            rtreeFileRef.getFile().delete();
+        }
+    }
+
+    @Override
+    public void addFlushedComponent(Object index) {
+        diskComponents.addFirst((LSMRTreeComponent) index);
+    }
+
+    @Override
+    public InMemoryFreePageManager getInMemoryFreePageManager() {
+        return memFreePageManager;
+    }
+
+    @Override
+    public void resetInMemoryComponent() throws HyracksDataException {
+        memComponent.getRTree().create(MEM_RTREE_FILE_ID);        
+        memComponent.getBTree().create(MEM_BTREE_FILE_ID);
+        memFreePageManager.reset();
+    }
+
+    @Override
+    public List<Object> getDiskComponents() {
+        return diskComponents;
+    }
+    
     protected LSMRTreeOpContext createOpContext() {
-
-        return new LSMRTreeOpContext((RTree.RTreeAccessor) memRTree.createAccessor(),
+        return new LSMRTreeOpContext((RTree.RTreeAccessor) memComponent.getRTree().createAccessor(),
                 (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame(),
                 (IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(), memFreePageManager
-                        .getMetaDataFrameFactory().createFrame(), 8, (BTree.BTreeAccessor) memBTree.createAccessor(),
+                        .getMetaDataFrameFactory().createFrame(), 8, (BTree.BTreeAccessor) memComponent.getBTree().createAccessor(),
                 btreeLeafFrameFactory, btreeInteriorFrameFactory, memFreePageManager.getMetaDataFrameFactory()
-                        .createFrame(), cmp);
+                        .createFrame(), btreeCmp);
     }
-
-    private class LSMRTreeAccessor implements ITreeIndexAccessor {
-        private LSMRTree lsmRTree;
-        private LSMRTreeOpContext ctx;
-
-        public LSMRTreeAccessor(LSMRTree lsmRTree) {
-            this.lsmRTree = lsmRTree;
-            this.ctx = lsmRTree.createOpContext();
-
-        }
-
-        @Override
-        public void insert(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
-            ctx.reset(IndexOp.INSERT);
-            lsmRTree.insert(tuple, ctx);
-        }
-
-        @Override
-        public void update(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
-            throw new UnsupportedOperationException("Update not supported by LSMRTree");
-        }
-
-        @Override
-        public void delete(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
-            ctx.reset(IndexOp.DELETE);
-            lsmRTree.delete(tuple, ctx);
+    
+    @Override
+    public ITreeIndexAccessor createAccessor() {
+        return new LSMRTreeAccessor(lsmHarness, createOpContext());
+    }
+    
+    private class LSMRTreeAccessor extends LSMTreeIndexAccessor {
+        public LSMRTreeAccessor(LSMHarness lsmHarness, IIndexOpContext ctx) {
+            super(lsmHarness, ctx);
         }
 
         @Override
         public ITreeIndexCursor createSearchCursor() {
             return new LSMRTreeSearchCursor();
         }
-
-        @Override
-        public void search(ITreeIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException,
-                TreeIndexException {
-            ctx.reset(IndexOp.SEARCH);
-            // TODO: fix exception handling throughout LSM tree.
-            try {
-                lsmRTree.search(cursor, searchPred, ctx, true);
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        @Override
-        public ITreeIndexCursor createDiskOrderScanCursor() {
-            throw new UnsupportedOperationException("DiskOrderScan not supported by LSMRTree.");
-        }
-
-        @Override
-        public void diskOrderScan(ITreeIndexCursor cursor) throws HyracksDataException {
-            throw new UnsupportedOperationException("DiskOrderScan not supported by LSMRTree.");
-        }
     }
-
 }
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
index 487d7d1..c32bf06 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
@@ -15,12 +15,11 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
 
 public class LSMRTreeCursorInitialState implements ICursorInitialState {
@@ -30,24 +29,22 @@
     private ITreeIndexFrameFactory rtreeLeafFrameFactory;
     private ITreeIndexFrameFactory btreeLeafFrameFactory;
     private MultiComparator btreeCmp;
-    private LSMRTree lsmRTree;
     private ITreeIndexAccessor[] bTreeAccessors;
     private final boolean includeMemRTree;
-    private final AtomicInteger searcherRefCount;
+    private final LSMHarness lsmHarness;
 
     public LSMRTreeCursorInitialState(int numberOfTrees, ITreeIndexFrameFactory rtreeLeafFrameFactory,
             ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
-            MultiComparator btreeCmp, ITreeIndexAccessor[] bTreeAccessors, LSMRTree lsmRTree, boolean includeMemRTree,
-            AtomicInteger searcherRefCount) {
+            MultiComparator btreeCmp, ITreeIndexAccessor[] bTreeAccessors, boolean includeMemRTree,
+            LSMHarness lsmHarness) {
         this.numberOfTrees = numberOfTrees;
         this.rtreeLeafFrameFactory = rtreeLeafFrameFactory;
         this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory;
         this.btreeLeafFrameFactory = btreeLeafFrameFactory;
         this.btreeCmp = btreeCmp;
-        this.lsmRTree = lsmRTree;
         this.bTreeAccessors = bTreeAccessors;
         this.includeMemRTree = includeMemRTree;
-        this.searcherRefCount = searcherRefCount;
+        this.lsmHarness = lsmHarness;
     }
 
     public int getNumberOfTrees() {
@@ -83,16 +80,12 @@
         return bTreeAccessors;
     }
 
-    public LSMRTree getLsmRTree() {
-        return lsmRTree;
-    }
-
     public boolean getIncludeMemRTree() {
         return includeMemRTree;
     }
 
-    public AtomicInteger getSearcherRefCount() {
-        return searcherRefCount;
+    public LSMHarness getLSMHarness() {
+        return lsmHarness;
     }
 
 }
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index f8a4432..5be0e63 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -33,7 +33,8 @@
     private BTreeOpContext btreeOpContext;
     public final RTree.RTreeAccessor memRTreeAccessor;
     public final BTree.BTreeAccessor memBTreeAccessor;
-
+    private IndexOp op;
+    
     public LSMRTreeOpContext(RTree.RTreeAccessor memRtreeAccessor, IRTreeLeafFrame rtreeLeafFrame,
             IRTreeInteriorFrame rtreeInteriorFrame, ITreeIndexMetaDataFrame rtreeMetaFrame, int rTreeHeightHint,
             BTree.BTreeAccessor memBtreeAccessor, ITreeIndexFrameFactory btreeLeafFrameFactory,
@@ -53,6 +54,7 @@
         } else if (newOp == IndexOp.DELETE) {
             btreeOpContext.reset(IndexOp.INSERT);
         }
+        this.op = newOp;
     }
 
     @Override
@@ -60,4 +62,7 @@
 
     }
 
+    public IndexOp getIndexOp() {
+    	return op;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index cdcbb2b..9fd47e9 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -15,8 +15,6 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
@@ -28,6 +26,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
@@ -42,11 +41,10 @@
     private int currentCursror;
     private MultiComparator btreeCmp;
     private int numberOfTrees;
-    private LSMRTree lsmRTree;
     private RangePredicate btreeRangePredicate;
     private ITupleReference frameTuple;
     private boolean includeMemRTree;
-    private AtomicInteger searcherRefCount;
+    private LSMHarness lsmHarness;
 
     public LSMRTreeSearchCursor() {
         currentCursror = 0;
@@ -106,10 +104,9 @@
     @Override
     public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
         LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
-        lsmRTree = lsmInitialState.getLsmRTree();
         btreeCmp = lsmInitialState.getBTreeCmp();
         includeMemRTree = lsmInitialState.getIncludeMemRTree();
-        searcherRefCount = lsmInitialState.getSearcherRefCount();
+        lsmHarness = lsmInitialState.getLSMHarness();
         numberOfTrees = lsmInitialState.getNumberOfTrees();
         diskBTreeAccessors = lsmInitialState.getBTreeAccessors();
 
@@ -135,24 +132,15 @@
 
     @Override
     public void close() throws HyracksDataException {
-        for (int i = 0; i < numberOfTrees; i++) {
+        try {
+    	for (int i = 0; i < numberOfTrees; i++) {
             rtreeCursors[i].close();
             btreeCursors[i].close();
         }
         rtreeCursors = null;
         btreeCursors = null;
-
-        // If the in-memory RTree was not included in the search, then we don't
-        // need to synchronize with a flush.
-        if (includeMemRTree) {
-            try {
-                lsmRTree.threadExit();
-            } catch (TreeIndexException e) {
-                throw new HyracksDataException(e);
-            }
-        } else {
-            // Synchronize with ongoing merges.
-            searcherRefCount.decrementAndGet();
+        } finally {
+        	lsmHarness.closeSearchCursor(includeMemRTree);
         }
     }
 
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index 0113bff..90c41d9 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -23,7 +23,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.tests.IOrderedIndexTestContext;
 import edu.uci.ics.hyracks.storage.am.btree.tests.OrderedIndexTestDriver;
 import edu.uci.ics.hyracks.storage.am.btree.tests.OrderedIndexTestUtils;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTreeIndexAccessor;
 
 @SuppressWarnings("rawtypes")
 public abstract class LSMBTreeMergeTestDriver extends OrderedIndexTestDriver {
@@ -57,8 +57,8 @@
                 }
             }
 
-            LSMBTree lsmBTree = (LSMBTree) ctx.getIndex();
-            lsmBTree.merge();
+            ILSMTreeIndexAccessor accessor = (ILSMTreeIndexAccessor) ctx.getIndexAccessor();
+            accessor.merge();
             
             OrderedIndexTestUtils.checkPointSearches(ctx);
             OrderedIndexTestUtils.checkOrderedScan(ctx);