Implemented LSM-BTree merge including correct behavior with concurrent accesses.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1081 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 0072cf2..005e712 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -17,10 +17,14 @@
import java.io.File;
import java.io.FilenameFilter;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedList;
+import java.util.List;
import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -50,6 +54,7 @@
public class LSMBTree implements ILSMTree {
protected final Logger LOGGER = Logger.getLogger(LSMBTree.class.getName());
+ private static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
// In-memory components.
private final BTree memBTree;
@@ -71,10 +76,20 @@
private final ITreeIndexFrameFactory deleteLeafFrameFactory;
private final MultiComparator cmp;
- // For dealing with concurrent accesses.
+ // For synchronizing all operations with flushes.
+ // Currently, all operations block during a flush.
private int threadRefCount;
private boolean flushFlag;
+ // For synchronizing searchers with a concurrent merge.
+ private AtomicBoolean isMerging = new AtomicBoolean(false);
+ private AtomicInteger searcherRefCountA = new AtomicInteger(0);
+ private AtomicInteger searcherRefCountB = new AtomicInteger(0);
+ // Represents the current number of searcher threads that are operating on
+ // the unmerged on-disk BTrees.
+ // We alternate between searcherRefCountA and searcherRefCountB.
+ private AtomicInteger searcherRefCount = searcherRefCountA;
+
public LSMBTree(IBufferCache memBufferCache, InMemoryFreePageManager memFreePageManager,
ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMFileNameManager fileNameManager, BTreeFactory diskBTreeFactory,
@@ -332,40 +347,61 @@
private BTree createDiskBTree(BTreeFactory factory, String fileName, boolean createBTree) throws HyracksDataException {
// Register the new BTree file.
FileReference file = new FileReference(new File(fileName));
- // TODO: Delete the file during cleanup.
+ // File will be deleted during cleanup of merge().
diskBufferCache.createFile(file);
int diskBTreeFileId = diskFileMapProvider.lookupFileId(file);
- // TODO: Close the file during cleanup.
+ // File will be closed during cleanup of merge().
diskBufferCache.openFile(diskBTreeFileId);
// Create new BTree instance.
BTree diskBTree = factory.createBTreeInstance(diskBTreeFileId);
if (createBTree) {
diskBTree.create(diskBTreeFileId);
}
- // TODO: Close the BTree during cleanup.
+ // BTree will be closed during cleanup of merge().
diskBTree.open(diskBTreeFileId);
return diskBTree;
}
- private void search(ITreeIndexCursor cursor, RangePredicate pred, LSMBTreeOpContext ctx, boolean includeMemBTree) throws HyracksDataException, TreeIndexException {
- boolean waitForFlush = true;
- do {
- synchronized (this) {
- if (!flushFlag) {
- // The corresponding threadExit() is in LSMTreeRangeSearchCursor.close().
- threadEnter();
- waitForFlush = false;
+ private List<BTree> search(ITreeIndexCursor cursor, RangePredicate pred, LSMBTreeOpContext ctx, boolean includeMemBTree) throws HyracksDataException, TreeIndexException {
+ // If the search doesn't include the in-memory BTree, then we don't have
+ // to synchronize with a flush.
+ if (includeMemBTree) {
+ boolean waitForFlush = true;
+ do {
+ synchronized (this) {
+ if (!flushFlag) {
+ // The corresponding threadExit() is in
+ // LSMTreeRangeSearchCursor.close().
+ threadEnter();
+ waitForFlush = false;
+ }
}
+ } while (waitForFlush);
+ }
+
+ // Get a snapshot of the current on-disk BTrees.
+ // If includeMemBTree is true, then no concurrent
+ // flush can add another on-disk BTree (due to threadEnter());
+ // If includeMemBTree is false, then it is possible that a concurrent
+ // flush adds another on-disk BTree.
+ // Since this mode is only used for merging trees, it doesn't really
+ // matter if the merge excludes the new on-disk BTree,
+ List<BTree> diskBTreesSnapshot = new ArrayList<BTree>();
+ AtomicInteger localSearcherRefCount = null;
+ synchronized (diskBTrees) {
+ diskBTreesSnapshot.addAll(diskBTrees);
+ // Only remember the search ref count when performing a merge (i.e., includeMemBTree is false).
+ if (!includeMemBTree) {
+ localSearcherRefCount = searcherRefCount;
+ localSearcherRefCount.incrementAndGet();
}
- } while (waitForFlush);
+ }
- // TODO: Think about what happens with possibly concurrent merges.
LSMBTreeRangeSearchCursor lsmTreeCursor = (LSMBTreeRangeSearchCursor) cursor;
- int numDiskBTrees = diskBTrees.size();
- int numBTrees = (includeMemBTree) ? numDiskBTrees + 1 : numDiskBTrees;
- ListIterator<BTree> diskBTreesIter = diskBTrees.listIterator();
+ int numDiskBTrees = diskBTreesSnapshot.size();
+ int numBTrees = (includeMemBTree) ? numDiskBTrees + 1 : numDiskBTrees;
LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees,
- insertLeafFrameFactory, cmp, this);
+ insertLeafFrameFactory, cmp, this, includeMemBTree, localSearcherRefCount);
lsmTreeCursor.open(initialState, pred);
int cursorIx;
@@ -381,6 +417,7 @@
// Open cursors of on-disk BTrees.
ITreeIndexAccessor[] diskBTreeAccessors = new ITreeIndexAccessor[numDiskBTrees];
int diskBTreeIx = 0;
+ ListIterator<BTree> diskBTreesIter = diskBTreesSnapshot.listIterator();
while(diskBTreesIter.hasNext()) {
BTree diskBTree = diskBTreesIter.next();
diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor();
@@ -389,6 +426,7 @@
diskBTreeIx++;
}
lsmTreeCursor.initPriorityQueue();
+ return diskBTreesSnapshot;
}
private void insert(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
@@ -399,15 +437,26 @@
lsmPerformOp(tuple, ctx);
}
- public void merge() throws Exception {
+ public void merge() throws HyracksDataException, TreeIndexException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Merging LSM-BTree.");
}
+ if (isMerging.get()) {
+ throw new TreeIndexException("Merge already in progress in LSMBTree. Only one concurrent merge allowed.");
+ }
+ isMerging.set(true);
+
+ // Point to the current searcher ref count, so we can wait for it later
+ // (after we swap the searcher ref count).
+ AtomicInteger localSearcherRefCount = searcherRefCount;
+
LSMBTreeOpContext ctx = createOpContext();
ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor();
RangePredicate rangePred = new RangePredicate(true, null, null, true, true, null, null);
// Ordered scan, ignoring the in-memory BTree.
- search(cursor, (RangePredicate) rangePred, ctx, false);
+ // We get back a snapshot of the on-disk BTrees that are going to be
+ // merged now, so we can clean them up after the merge has completed.
+ List<BTree> mergingDiskBTrees = search(cursor, (RangePredicate) rangePred, ctx, false);
// Bulk load the tuples from all on-disk BTrees into the new BTree.
BTree mergedBTree = createMergeTargetBTree();
@@ -422,6 +471,43 @@
cursor.close();
}
mergedBTree.endBulkLoad(bulkLoadCtx);
+
+ // Remove the old BTrees from the list, and add the new merged BTree.
+ // Also, swap the searchRefCount.
+ synchronized (diskBTrees) {
+ diskBTrees.removeAll(mergingDiskBTrees);
+ diskBTrees.addLast(mergedBTree);
+ // Swap the searcher ref count reference, and reset it to zero.
+ if (searcherRefCount == searcherRefCountA) {
+ searcherRefCount = searcherRefCountB;
+ } else {
+ searcherRefCount = searcherRefCountA;
+ }
+ searcherRefCount.set(0);
+ }
+
+ // Wait for all searchers that are still accessing the old on-disk
+ // BTrees, then perform the final cleanup of the old BTrees.
+ while (localSearcherRefCount.get() != 0) {
+ try {
+ Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
+ } catch (InterruptedException e) {
+ // Propagate the exception to the caller, so that an appropriate
+ // cleanup action can be taken.
+ throw new HyracksDataException(e);
+ }
+ }
+
+ // Cleanup. At this point we have guaranteed that no searchers are
+ // touching the old on-disk BTrees (localSearcherRefCount == 0).
+ for (BTree oldBTree : mergingDiskBTrees) {
+ oldBTree.close();
+ FileReference fileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
+ diskBufferCache.closeFile(oldBTree.getFileId());
+ diskBufferCache.deleteFile(oldBTree.getFileId());
+ fileRef.getFile().delete();
+ }
+ isMerging.set(false);
}
public class LSMTreeBulkLoadContext implements IIndexBulkLoadContext {
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
index 5391208..44806c7 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+import java.util.concurrent.atomic.AtomicInteger;
+
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -22,17 +24,22 @@
public class LSMBTreeCursorInitialState implements ICursorInitialState {
- private int numBTrees;
- private ITreeIndexFrameFactory leafFrameFactory;
- private MultiComparator cmp;
- private LSMBTree lsm;
+ private final int numBTrees;
+ private final ITreeIndexFrameFactory leafFrameFactory;
+ private final MultiComparator cmp;
+ private final LSMBTree lsmBTree;
+ private final boolean includeMemBTree;
+ private final AtomicInteger searcherRefCount;
- public LSMBTreeCursorInitialState(int numBTrees, ITreeIndexFrameFactory leafFrameFactory, MultiComparator cmp, LSMBTree lsm) {
- this.numBTrees = numBTrees;
- this.leafFrameFactory = leafFrameFactory;
- this.cmp = cmp;
- this.lsm = lsm;
- }
+ public LSMBTreeCursorInitialState(int numBTrees, ITreeIndexFrameFactory leafFrameFactory, MultiComparator cmp,
+ LSMBTree lsmBTree, boolean includeMemBTree, AtomicInteger searcherRefCount) {
+ this.numBTrees = numBTrees;
+ this.leafFrameFactory = leafFrameFactory;
+ this.cmp = cmp;
+ this.lsmBTree = lsmBTree;
+ this.includeMemBTree = includeMemBTree;
+ this.searcherRefCount = searcherRefCount;
+ }
public int getNumBTrees() {
return numBTrees;
@@ -56,7 +63,14 @@
}
public LSMBTree getLsm() {
- return lsm;
+ return lsmBTree;
}
-
+
+ public boolean getIncludeMemBTree() {
+ return includeMemBTree;
+ }
+
+ public AtomicInteger getSearcherRefCount() {
+ return searcherRefCount;
+ }
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 5bf3810..15c81cc 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -17,6 +17,7 @@
import java.util.Comparator;
import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -40,6 +41,8 @@
private PriorityQueueElement reusedElement;
private boolean needPush;
private LSMBTree lsmTree;
+ private boolean includeMemBTree;
+ private AtomicInteger searcherRefCount;
public LSMBTreeRangeSearchCursor() {
outputElement = null;
@@ -96,6 +99,8 @@
IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getLeafFrameFactory().createFrame();
rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
}
+ includeMemBTree = lsmInitialState.getIncludeMemBTree();
+ searcherRefCount = lsmInitialState.getSearcherRefCount();
setPriorityQueueComparator();
}
@@ -118,10 +123,17 @@
rangeCursors[i].close();
}
rangeCursors = null;
- try {
- lsmTree.threadExit();
- } catch (TreeIndexException e) {
- throw new HyracksDataException(e);
+ // If the in-memory BTree was not included in the search, then we don't
+ // need to synchronize with a flush.
+ if (includeMemBTree) {
+ try {
+ lsmTree.threadExit();
+ } catch (TreeIndexException e) {
+ throw new HyracksDataException(e);
+ }
+ } else {
+ // Synchronize with ongoing merges.
+ searcherRefCount.decrementAndGet();
}
}