Implemented LSM-BTree merge including correct behavior with concurrent accesses.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1081 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 0072cf2..005e712 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -17,10 +17,14 @@
 
 import java.io.File;
 import java.io.FilenameFilter;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -50,6 +54,7 @@
 
 public class LSMBTree implements ILSMTree {
     protected final Logger LOGGER = Logger.getLogger(LSMBTree.class.getName());
+    private static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
     
     // In-memory components.
     private final BTree memBTree;
@@ -71,10 +76,20 @@
     private final ITreeIndexFrameFactory deleteLeafFrameFactory;
     private final MultiComparator cmp;
     
-    // For dealing with concurrent accesses.
+    // For synchronizing all operations with flushes.
+    // Currently, all operations block during a flush.
     private int threadRefCount;
     private boolean flushFlag;
 
+    // For synchronizing searchers with a concurrent merge.
+    private AtomicBoolean isMerging = new AtomicBoolean(false);
+    private AtomicInteger searcherRefCountA = new AtomicInteger(0);
+    private AtomicInteger searcherRefCountB = new AtomicInteger(0);
+    // Represents the current number of searcher threads that are operating on
+    // the unmerged on-disk BTrees.
+    // We alternate between searcherRefCountA and searcherRefCountB.
+    private AtomicInteger searcherRefCount = searcherRefCountA;
+    
     public LSMBTree(IBufferCache memBufferCache, InMemoryFreePageManager memFreePageManager,
             ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
             ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMFileNameManager fileNameManager, BTreeFactory diskBTreeFactory,
@@ -332,40 +347,61 @@
     private BTree createDiskBTree(BTreeFactory factory, String fileName, boolean createBTree) throws HyracksDataException {
         // Register the new BTree file.        
         FileReference file = new FileReference(new File(fileName));
-        // TODO: Delete the file during cleanup.
+        // File will be deleted during cleanup of merge().
         diskBufferCache.createFile(file);
         int diskBTreeFileId = diskFileMapProvider.lookupFileId(file);
-        // TODO: Close the file during cleanup.
+        // File will be closed during cleanup of merge().
         diskBufferCache.openFile(diskBTreeFileId);
         // Create new BTree instance.
         BTree diskBTree = factory.createBTreeInstance(diskBTreeFileId);
         if (createBTree) {
             diskBTree.create(diskBTreeFileId);
         }
-        // TODO: Close the BTree during cleanup.
+        // BTree will be closed during cleanup of merge().
         diskBTree.open(diskBTreeFileId);
         return diskBTree;
     }
     
-    private void search(ITreeIndexCursor cursor, RangePredicate pred, LSMBTreeOpContext ctx, boolean includeMemBTree) throws HyracksDataException, TreeIndexException {                
-        boolean waitForFlush = true;
-        do {
-            synchronized (this) {
-                if (!flushFlag) {
-                    // The corresponding threadExit() is in LSMTreeRangeSearchCursor.close().
-                    threadEnter();
-                    waitForFlush = false;
+    private List<BTree> search(ITreeIndexCursor cursor, RangePredicate pred, LSMBTreeOpContext ctx, boolean includeMemBTree) throws HyracksDataException, TreeIndexException {                
+        // If the search doesn't include the in-memory BTree, then we don't have
+        // to synchronize with a flush.
+        if (includeMemBTree) {
+            boolean waitForFlush = true;
+            do {
+                synchronized (this) {
+                    if (!flushFlag) {
+                        // The corresponding threadExit() is in
+                        // LSMTreeRangeSearchCursor.close().
+                        threadEnter();
+                        waitForFlush = false;
+                    }
                 }
+            } while (waitForFlush);
+        }
+
+        // Get a snapshot of the current on-disk BTrees.
+        // If includeMemBTree is true, then no concurrent
+        // flush can add another on-disk BTree (due to threadEnter());
+        // If includeMemBTree is false, then it is possible that a concurrent
+        // flush adds another on-disk BTree.
+        // Since this mode is only used for merging trees, it doesn't really
+        // matter if the merge excludes the new on-disk BTree,
+        List<BTree> diskBTreesSnapshot = new ArrayList<BTree>();
+        AtomicInteger localSearcherRefCount = null;
+        synchronized (diskBTrees) {
+            diskBTreesSnapshot.addAll(diskBTrees);
+            // Only remember the search ref count when performing a merge (i.e., includeMemBTree is false).
+            if (!includeMemBTree) {
+                localSearcherRefCount = searcherRefCount;
+                localSearcherRefCount.incrementAndGet();
             }
-        } while (waitForFlush);
+        }
         
-        // TODO: Think about what happens with possibly concurrent merges.
         LSMBTreeRangeSearchCursor lsmTreeCursor = (LSMBTreeRangeSearchCursor) cursor;
-        int numDiskBTrees = diskBTrees.size();
-        int numBTrees = (includeMemBTree) ? numDiskBTrees + 1 : numDiskBTrees;        
-        ListIterator<BTree> diskBTreesIter = diskBTrees.listIterator();
+        int numDiskBTrees = diskBTreesSnapshot.size();
+        int numBTrees = (includeMemBTree) ? numDiskBTrees + 1 : numDiskBTrees;                
         LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees,
-                insertLeafFrameFactory, cmp, this);
+                insertLeafFrameFactory, cmp, this, includeMemBTree, localSearcherRefCount);
         lsmTreeCursor.open(initialState, pred);
         
         int cursorIx;
@@ -381,6 +417,7 @@
         // Open cursors of on-disk BTrees.
         ITreeIndexAccessor[] diskBTreeAccessors = new ITreeIndexAccessor[numDiskBTrees];
         int diskBTreeIx = 0;
+        ListIterator<BTree> diskBTreesIter = diskBTreesSnapshot.listIterator();
         while(diskBTreesIter.hasNext()) {
             BTree diskBTree = diskBTreesIter.next();
             diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor();
@@ -389,6 +426,7 @@
             diskBTreeIx++;
         }
         lsmTreeCursor.initPriorityQueue();
+        return diskBTreesSnapshot;
     }
     
     private void insert(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
@@ -399,15 +437,26 @@
         lsmPerformOp(tuple, ctx);
     }
 
-    public void merge() throws Exception {
+    public void merge() throws HyracksDataException, TreeIndexException  {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Merging LSM-BTree.");
         }
+        if (isMerging.get()) {
+            throw new TreeIndexException("Merge already in progress in LSMBTree. Only one concurrent merge allowed.");
+        }
+        isMerging.set(true);
+        
+        // Point to the current searcher ref count, so we can wait for it later
+        // (after we swap the searcher ref count).
+        AtomicInteger localSearcherRefCount = searcherRefCount;
+        
         LSMBTreeOpContext ctx = createOpContext();
         ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor();
         RangePredicate rangePred = new RangePredicate(true, null, null, true, true, null, null);
         // Ordered scan, ignoring the in-memory BTree.
-        search(cursor, (RangePredicate) rangePred, ctx, false);
+        // We get back a snapshot of the on-disk BTrees that are going to be
+        // merged now, so we can clean them up after the merge has completed.
+        List<BTree> mergingDiskBTrees = search(cursor, (RangePredicate) rangePred, ctx, false);
 
         // Bulk load the tuples from all on-disk BTrees into the new BTree.
         BTree mergedBTree = createMergeTargetBTree();
@@ -422,6 +471,43 @@
             cursor.close();
         }
         mergedBTree.endBulkLoad(bulkLoadCtx);
+        
+        // Remove the old BTrees from the list, and add the new merged BTree.
+        // Also, swap the searchRefCount.
+        synchronized (diskBTrees) {
+            diskBTrees.removeAll(mergingDiskBTrees);
+            diskBTrees.addLast(mergedBTree);
+            // Swap the searcher ref count reference, and reset it to zero.
+            if (searcherRefCount == searcherRefCountA) {
+                searcherRefCount = searcherRefCountB;
+            } else {
+                searcherRefCount = searcherRefCountA;
+            }
+            searcherRefCount.set(0);
+        }
+        
+        // Wait for all searchers that are still accessing the old on-disk
+        // BTrees, then perform the final cleanup of the old BTrees.
+        while (localSearcherRefCount.get() != 0) {
+            try {
+                Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
+            } catch (InterruptedException e) {
+                // Propagate the exception to the caller, so that an appropriate
+                // cleanup action can be taken.
+                throw new HyracksDataException(e);
+            }
+        }
+        
+        // Cleanup. At this point we have guaranteed that no searchers are
+        // touching the old on-disk BTrees (localSearcherRefCount == 0).
+        for (BTree oldBTree : mergingDiskBTrees) {
+            oldBTree.close();
+            FileReference fileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
+            diskBufferCache.closeFile(oldBTree.getFileId());
+            diskBufferCache.deleteFile(oldBTree.getFileId());
+            fileRef.getFile().delete();
+        }
+        isMerging.set(false);
     }
     
     public class LSMTreeBulkLoadContext implements IIndexBulkLoadContext {
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
index 5391208..44806c7 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -22,17 +24,22 @@
 
 public class LSMBTreeCursorInitialState implements ICursorInitialState {
 
-	private int numBTrees;
-	private ITreeIndexFrameFactory leafFrameFactory;
-	private MultiComparator cmp;
-	private LSMBTree lsm;
+	private final int numBTrees;
+	private final ITreeIndexFrameFactory leafFrameFactory;
+	private final MultiComparator cmp;
+	private final LSMBTree lsmBTree;
+	private final boolean includeMemBTree;
+	private final AtomicInteger searcherRefCount;
 	
-	public LSMBTreeCursorInitialState(int numBTrees, ITreeIndexFrameFactory leafFrameFactory, MultiComparator cmp, LSMBTree lsm) {
-		this.numBTrees = numBTrees;
-		this.leafFrameFactory = leafFrameFactory;
-		this.cmp = cmp;
-		this.lsm = lsm;
-	}
+    public LSMBTreeCursorInitialState(int numBTrees, ITreeIndexFrameFactory leafFrameFactory, MultiComparator cmp,
+            LSMBTree lsmBTree, boolean includeMemBTree, AtomicInteger searcherRefCount) {
+        this.numBTrees = numBTrees;
+        this.leafFrameFactory = leafFrameFactory;
+        this.cmp = cmp;
+        this.lsmBTree = lsmBTree;
+        this.includeMemBTree = includeMemBTree;
+        this.searcherRefCount = searcherRefCount;
+    }
 	
 	public int getNumBTrees() {
 		return numBTrees;
@@ -56,7 +63,14 @@
 	}
 
 	public LSMBTree getLsm() {
-		return lsm;
+		return lsmBTree;
 	}
-
+	
+	public boolean getIncludeMemBTree() {
+	    return includeMemBTree;
+	}
+	
+	public AtomicInteger getSearcherRefCount() {
+	    return searcherRefCount;
+	}
 }
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 5bf3810..15c81cc 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -17,6 +17,7 @@
 
 import java.util.Comparator;
 import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -40,6 +41,8 @@
     private PriorityQueueElement reusedElement;
     private boolean needPush;
     private LSMBTree lsmTree;
+    private boolean includeMemBTree;
+    private AtomicInteger searcherRefCount;
 
     public LSMBTreeRangeSearchCursor() {
         outputElement = null;
@@ -96,6 +99,8 @@
             IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getLeafFrameFactory().createFrame();
             rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
         }
+        includeMemBTree = lsmInitialState.getIncludeMemBTree();
+        searcherRefCount = lsmInitialState.getSearcherRefCount();
         setPriorityQueueComparator();
     }
     
@@ -118,10 +123,17 @@
             rangeCursors[i].close();
         }
         rangeCursors = null;
-        try {
-            lsmTree.threadExit();
-        } catch (TreeIndexException e) {
-            throw new HyracksDataException(e);
+        // If the in-memory BTree was not included in the search, then we don't
+        // need to synchronize with a flush.
+        if (includeMemBTree) {
+            try {
+                lsmTree.threadExit();
+            } catch (TreeIndexException e) {
+                throw new HyracksDataException(e);
+            }
+        } else {
+            // Synchronize with ongoing merges.
+            searcherRefCount.decrementAndGet();
         }
     }