Factored out thread-concurrency code of LSM-Trees into a common harness. Modified LSMBTree and LSMRTree to use the harness.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1104 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 12b47dd..6358e87 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -17,15 +17,11 @@
import java.io.File;
import java.io.FilenameFilter;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -38,7 +34,9 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
@@ -49,16 +47,19 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileNameManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
public class LSMBTree implements ILSMTree {
protected final Logger LOGGER = Logger.getLogger(LSMBTree.class.getName());
- private static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
+
+ private final LSMHarness lsmHarness;
// In-memory components.
private final BTree memBTree;
- private final InMemoryFreePageManager memFreePageManager;
+ private final InMemoryFreePageManager memFreePageManager;
// On-disk components.
private final ILSMFileNameManager fileNameManager;
@@ -69,27 +70,14 @@
private final BTreeFactory bulkLoadBTreeFactory;
private final IBufferCache diskBufferCache;
private final IFileMapProvider diskFileMapProvider;
- private LinkedList<BTree> diskBTrees = new LinkedList<BTree>();
+ // List of BTree instances. Using Object for better sharing via ILSMTree + LSMHarness.
+ private LinkedList<Object> diskBTrees = new LinkedList<Object>();
// Common for in-memory and on-disk components.
private final ITreeIndexFrameFactory insertLeafFrameFactory;
private final ITreeIndexFrameFactory deleteLeafFrameFactory;
private final MultiComparator cmp;
- // For synchronizing all operations with flushes.
- // Currently, all operations block during a flush.
- private int threadRefCount;
- private boolean flushFlag;
-
- // For synchronizing searchers with a concurrent merge.
- private AtomicBoolean isMerging = new AtomicBoolean(false);
- private AtomicInteger searcherRefCountA = new AtomicInteger(0);
- private AtomicInteger searcherRefCountB = new AtomicInteger(0);
- // Represents the current number of searcher threads that are operating on
- // the unmerged on-disk BTrees.
- // We alternate between searcherRefCountA and searcherRefCountB.
- private AtomicInteger searcherRefCount = searcherRefCountA;
-
public LSMBTree(IBufferCache memBufferCache, InMemoryFreePageManager memFreePageManager,
ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMFileNameManager fileNameManager, BTreeFactory diskBTreeFactory,
@@ -104,10 +92,9 @@
this.diskBTreeFactory = diskBTreeFactory;
this.bulkLoadBTreeFactory = bulkLoadBTreeFactory;
this.cmp = cmp;
- this.diskBTrees = new LinkedList<BTree>();
- this.threadRefCount = 0;
- this.flushFlag = false;
+ this.diskBTrees = new LinkedList<Object>();
this.fileNameManager = fileNameManager;
+ lsmHarness = new LSMHarness(this);
}
@Override
@@ -154,7 +141,8 @@
@Override
public void close() throws HyracksDataException {
- for (BTree btree : diskBTrees) {
+ for (Object o : diskBTrees) {
+ BTree btree = (BTree) o;
diskBufferCache.closeFile(btree.getFileId());
btree.close();
}
@@ -162,37 +150,25 @@
memBTree.close();
}
- private void lsmPerformOp(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
- boolean waitForFlush = true;
- do {
- // Wait for ongoing flush to complete.
- synchronized (this) {
- if (!flushFlag) {
- // Increments threadRefCount, to force a flush to wait for this operation to finish.
- // (a flush can only begin once threadRefCount == 0).
- threadEnter();
- // Proceed with operation.
- waitForFlush = false;
- }
- }
- } while (waitForFlush);
- // TODO: This will become much simpler once the BTree supports a true upsert operation.
- try {
- ctx.memBTreeAccessor.insert(tuple);
- } catch (BTreeDuplicateKeyException e) {
- // Notice that a flush must wait for the current operation to
- // finish (threadRefCount must reach zero).
- // TODO: This methods below are very inefficient, we'd rather like
+ @Override
+ public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException, TreeIndexException {
+ LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
+ // TODO: This will become much simpler once the BTree supports a true upsert operation.
+ try {
+ ctx.memBTreeAccessor.insert(tuple);
+ } catch (BTreeDuplicateKeyException e) {
+ // Notice that a flush must wait for the current operation to
+ // finish (threadRefCount must reach zero).
+ // TODO: The methods below are very inefficient, we'd rather like
// to flip the antimatter bit one single BTree traversal.
- if (ctx.getIndexOp() == IndexOp.DELETE) {
- deleteExistingKey(tuple, ctx);
- } else {
- insertOrUpdateExistingKey(tuple, ctx);
- }
- }
- threadExit();
- }
-
+ if (ctx.getIndexOp() == IndexOp.DELETE) {
+ deleteExistingKey(tuple, ctx);
+ } else {
+ insertOrUpdateExistingKey(tuple, ctx);
+ }
+ }
+ }
+
private void deleteExistingKey(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
// We assume that tuple given by the user for deletion only contains the
// key fields, but not any non-key fields.
@@ -218,7 +194,7 @@
// There is a remote chance of livelocks due to this behavior.
if (tupleCopy == null) {
ctx.reset(IndexOp.DELETE);
- lsmPerformOp(tuple, ctx);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx);
return;
}
memBTreeUpdate(tupleCopy, ctx);
@@ -259,7 +235,7 @@
}
// Restart performOp to insert the tuple.
ctx.reset(originalOp);
- lsmPerformOp(tuple, ctx);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx);
}
private void memBTreeUpdate(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
@@ -273,34 +249,12 @@
// Simply restart the operation. There is a remote chance of
// livelocks due to this behavior.
ctx.reset(originalOp);
- lsmPerformOp(tuple, ctx);
- }
- }
-
- public void threadEnter() {
- threadRefCount++;
- }
-
- public void threadExit() throws HyracksDataException, TreeIndexException {
- synchronized (this) {
- threadRefCount--;
- // Check if we've reached or exceeded the maximum number of pages.
- if (!flushFlag && memFreePageManager.isFull()) {
- flushFlag = true;
- }
- // Flush will only be handled by last exiting thread.
- if (flushFlag && threadRefCount == 0) {
- flush();
- flushFlag = false;
- }
+ lsmHarness.insertUpdateOrDelete(tuple, ctx);
}
}
@Override
- public void flush() throws HyracksDataException, TreeIndexException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Flushing LSM-BTree.");
- }
+ public ITreeIndex flush() throws HyracksDataException, TreeIndexException {
// Bulk load a new on-disk BTree from the in-memory BTree.
RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
ITreeIndexAccessor memBTreeAccessor = memBTree.createAccessor();
@@ -318,13 +272,16 @@
scanCursor.close();
}
diskBTree.endBulkLoad(bulkLoadCtx);
- resetMemBTree();
- synchronized (diskBTrees) {
- diskBTrees.addFirst(diskBTree);
- }
+ return diskBTree;
}
- private void resetMemBTree() throws HyracksDataException {
+ @Override
+ public void addFlushedComponent(Object index) {
+ diskBTrees.addFirst(index);
+ }
+
+ @Override
+ public void resetInMemoryComponent() throws HyracksDataException {
memFreePageManager.reset();
memBTree.create(memBTree.getFileId());
}
@@ -364,50 +321,17 @@
return diskBTree;
}
- private List<BTree> search(ITreeIndexCursor cursor, RangePredicate pred, LSMBTreeOpContext ctx, boolean includeMemBTree) throws HyracksDataException, TreeIndexException {
- // If the search doesn't include the in-memory BTree, then we don't have
- // to synchronize with a flush.
- if (includeMemBTree) {
- boolean waitForFlush = true;
- do {
- synchronized (this) {
- if (!flushFlag) {
- // The corresponding threadExit() is in
- // LSMTreeRangeSearchCursor.close().
- threadEnter();
- waitForFlush = false;
- }
- }
- } while (waitForFlush);
- }
-
- // Get a snapshot of the current on-disk BTrees.
- // If includeMemBTree is true, then no concurrent
- // flush can add another on-disk BTree (due to threadEnter());
- // If includeMemBTree is false, then it is possible that a concurrent
- // flush adds another on-disk BTree.
- // Since this mode is only used for merging trees, it doesn't really
- // matter if the merge excludes the new on-disk BTree.
- List<BTree> diskBTreesSnapshot = new ArrayList<BTree>();
- AtomicInteger localSearcherRefCount = null;
- synchronized (diskBTrees) {
- diskBTreesSnapshot.addAll(diskBTrees);
- // Only remember the search ref count when performing a merge (i.e., includeMemBTree is false).
- if (!includeMemBTree) {
- localSearcherRefCount = searcherRefCount;
- localSearcherRefCount.incrementAndGet();
- }
- }
-
+ public void search(ITreeIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx, boolean includeMemComponent) throws HyracksDataException, TreeIndexException {
+ LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
LSMBTreeRangeSearchCursor lsmTreeCursor = (LSMBTreeRangeSearchCursor) cursor;
- int numDiskBTrees = diskBTreesSnapshot.size();
- int numBTrees = (includeMemBTree) ? numDiskBTrees + 1 : numDiskBTrees;
+ int numDiskBTrees = diskComponents.size();
+ int numBTrees = (includeMemComponent) ? numDiskBTrees + 1 : numDiskBTrees;
LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees,
- insertLeafFrameFactory, cmp, this, includeMemBTree, localSearcherRefCount);
+ insertLeafFrameFactory, cmp, includeMemComponent, lsmHarness);
lsmTreeCursor.open(initialState, pred);
int cursorIx;
- if (includeMemBTree) {
+ if (includeMemComponent) {
// Open cursor of in-memory BTree at index 0.
ctx.memBTreeAccessor.search(lsmTreeCursor.getCursor(0), pred);
// Skip 0 because it is the in-memory BTree.
@@ -419,46 +343,27 @@
// Open cursors of on-disk BTrees.
ITreeIndexAccessor[] diskBTreeAccessors = new ITreeIndexAccessor[numDiskBTrees];
int diskBTreeIx = 0;
- ListIterator<BTree> diskBTreesIter = diskBTreesSnapshot.listIterator();
+ ListIterator<Object> diskBTreesIter = diskComponents.listIterator();
while(diskBTreesIter.hasNext()) {
- BTree diskBTree = diskBTreesIter.next();
+ BTree diskBTree = (BTree) diskBTreesIter.next();
diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor();
diskBTreeAccessors[diskBTreeIx].search(lsmTreeCursor.getCursor(cursorIx), pred);
cursorIx++;
diskBTreeIx++;
}
lsmTreeCursor.initPriorityQueue();
- return diskBTreesSnapshot;
}
- private void insert(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
- lsmPerformOp(tuple, ctx);
- }
-
- private void delete(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
- lsmPerformOp(tuple, ctx);
- }
-
- public void merge() throws HyracksDataException, TreeIndexException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Merging LSM-BTree.");
- }
- if (!isMerging.compareAndSet(false, true)) {
- throw new TreeIndexException("Merge already in progress in LSMBTree. Only one concurrent merge allowed.");
- }
-
- // Point to the current searcher ref count, so we can wait for it later
- // (after we swap the searcher ref count).
- AtomicInteger localSearcherRefCount = searcherRefCount;
-
+ public ITreeIndex merge(List<Object> mergedComponents) throws HyracksDataException, TreeIndexException {
LSMBTreeOpContext ctx = createOpContext();
ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor();
RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
// Ordered scan, ignoring the in-memory BTree.
// We get back a snapshot of the on-disk BTrees that are going to be
// merged now, so we can clean them up after the merge has completed.
- List<BTree> mergingDiskBTrees = search(cursor, (RangePredicate) rangePred, ctx, false);
-
+ List<Object> mergingDiskBTrees = lsmHarness.search(cursor, (RangePredicate) rangePred, ctx, false);
+ mergedComponents.addAll(mergingDiskBTrees);
+
// Bulk load the tuples from all on-disk BTrees into the new BTree.
BTree mergedBTree = createMergeTargetBTree();
IIndexBulkLoadContext bulkLoadCtx = mergedBTree.beginBulkLoad(1.0f);
@@ -472,42 +377,34 @@
cursor.close();
}
mergedBTree.endBulkLoad(bulkLoadCtx);
-
- // Remove the old BTrees from the list, and add the new merged BTree.
- // Also, swap the searchRefCount.
- synchronized (diskBTrees) {
- diskBTrees.removeAll(mergingDiskBTrees);
- diskBTrees.addLast(mergedBTree);
- // Swap the searcher ref count reference, and reset it to zero.
- if (searcherRefCount == searcherRefCountA) {
- searcherRefCount = searcherRefCountB;
- } else {
- searcherRefCount = searcherRefCountA;
- }
- searcherRefCount.set(0);
- }
-
- // Wait for all searchers that are still accessing the old on-disk
- // BTrees, then perform the final cleanup of the old BTrees.
- while (localSearcherRefCount.get() != 0) {
- try {
- Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
- } catch (InterruptedException e) {
- // Propagate the exception to the caller, so that an appropriate
- // cleanup action can be taken.
- throw new HyracksDataException(e);
- }
- }
-
- // Cleanup. At this point we have guaranteed that no searchers are
- // touching the old on-disk BTrees (localSearcherRefCount == 0).
- for (BTree oldBTree : mergingDiskBTrees) {
+ return mergedBTree;
+ }
+
+ @Override
+ public void addMergedComponent(Object newComponent, List<Object> mergedComponents) {
+ diskBTrees.removeAll(mergedComponents);
+ diskBTrees.addLast(newComponent);
+ }
+
+ @Override
+ public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException {
+ for (Object o : mergedComponents) {
+ BTree oldBTree = (BTree) o;
FileReference fileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
diskBufferCache.closeFile(oldBTree.getFileId());
oldBTree.close();
fileRef.getFile().delete();
}
- isMerging.set(false);
+ }
+
+ @Override
+ public InMemoryFreePageManager getInMemoryFreePageManager() {
+ return (InMemoryFreePageManager) memBTree.getFreePageManager();
+ }
+
+ @Override
+ public List<Object> getDiskComponents() {
+ return diskBTrees;
}
public class LSMTreeBulkLoadContext implements IIndexBulkLoadContext {
@@ -548,10 +445,8 @@
@Override
public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
LSMTreeBulkLoadContext bulkLoadCtx = (LSMTreeBulkLoadContext) ictx;
- bulkLoadCtx.getBTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());
- synchronized (diskBTrees) {
- diskBTrees.addFirst(bulkLoadCtx.getBTree());
- }
+ bulkLoadCtx.getBTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());
+ lsmHarness.addBulkLoadedComponent(bulkLoadCtx.getBTree());
}
@Override
@@ -584,6 +479,11 @@
return memBTree.getIndexType();
}
+ @Override
+ public int getFileId() {
+ return memBTree.getFileId();
+ }
+
public MultiComparator getMultiComparator() {
return cmp;
}
@@ -594,59 +494,17 @@
@Override
public ITreeIndexAccessor createAccessor() {
- return new LSMTreeIndexAccessor(this);
+ return new LSMBTreeIndexAccessor(lsmHarness, createOpContext());
}
-
- private class LSMTreeIndexAccessor implements ITreeIndexAccessor {
- private LSMBTree lsmTree;
- private LSMBTreeOpContext ctx;
-
- public LSMTreeIndexAccessor(LSMBTree lsmTree) {
- this.lsmTree = lsmTree;
- this.ctx = lsmTree.createOpContext();
- }
-
- @Override
- public void insert(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
- ctx.reset(IndexOp.INSERT);
- lsmTree.insert(tuple, ctx);
- }
-
- @Override
- public void update(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
- // Update is the same as insert.
- ctx.reset(IndexOp.INSERT);
- insert(tuple);
- }
-
- @Override
- public void delete(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
- ctx.reset(IndexOp.DELETE);
- lsmTree.delete(tuple, ctx);
+
+ private class LSMBTreeIndexAccessor extends LSMTreeIndexAccessor {
+ public LSMBTreeIndexAccessor(LSMHarness lsmHarness, IIndexOpContext ctx) {
+ super(lsmHarness, ctx);
}
@Override
public ITreeIndexCursor createSearchCursor() {
return new LSMBTreeRangeSearchCursor();
}
-
- @Override
- public void search(ITreeIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException,
- TreeIndexException {
- ctx.reset(IndexOp.SEARCH);
- lsmTree.search(cursor, (RangePredicate) searchPred, ctx, true);
- }
-
- @Override
- public ITreeIndexCursor createDiskOrderScanCursor() {
- // Disk-order scan doesn't make sense for the LSMBTree because it cannot correctly resolve deleted tuples.
- throw new UnsupportedOperationException("DiskOrderScan not supported by LSMTree.");
- }
-
- @Override
- public void diskOrderScan(ITreeIndexCursor cursor) throws HyracksDataException {
- // Disk-order scan doesn't make sense for the LSMBTree because it cannot correctly resolve deleted tuples.
- throw new UnsupportedOperationException("DiskOrderScan not supported by LSMTree.");
- }
}
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
index 44806c7..22b85dd 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
@@ -15,11 +15,10 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
-import java.util.concurrent.atomic.AtomicInteger;
-
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
public class LSMBTreeCursorInitialState implements ICursorInitialState {
@@ -27,18 +26,16 @@
private final int numBTrees;
private final ITreeIndexFrameFactory leafFrameFactory;
private final MultiComparator cmp;
- private final LSMBTree lsmBTree;
private final boolean includeMemBTree;
- private final AtomicInteger searcherRefCount;
+ private final LSMHarness lsmHarness;
public LSMBTreeCursorInitialState(int numBTrees, ITreeIndexFrameFactory leafFrameFactory, MultiComparator cmp,
- LSMBTree lsmBTree, boolean includeMemBTree, AtomicInteger searcherRefCount) {
+ boolean includeMemBTree, LSMHarness lsmHarness) {
this.numBTrees = numBTrees;
this.leafFrameFactory = leafFrameFactory;
this.cmp = cmp;
- this.lsmBTree = lsmBTree;
this.includeMemBTree = includeMemBTree;
- this.searcherRefCount = searcherRefCount;
+ this.lsmHarness = lsmHarness;
}
public int getNumBTrees() {
@@ -62,15 +59,11 @@
public void setPage(ICachedPage page) {
}
- public LSMBTree getLsm() {
- return lsmBTree;
- }
-
public boolean getIncludeMemBTree() {
return includeMemBTree;
}
- public AtomicInteger getSearcherRefCount() {
- return searcherRefCount;
+ public LSMHarness getLSMHarness() {
+ return lsmHarness;
}
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 15c81cc..46f563f 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -17,7 +17,6 @@
import java.util.Comparator;
import java.util.PriorityQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -26,9 +25,9 @@
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
@@ -40,9 +39,8 @@
private PriorityQueueElement outputElement;
private PriorityQueueElement reusedElement;
private boolean needPush;
- private LSMBTree lsmTree;
private boolean includeMemBTree;
- private AtomicInteger searcherRefCount;
+ private LSMHarness lsmHarness;
public LSMBTreeRangeSearchCursor() {
outputElement = null;
@@ -91,7 +89,6 @@
@Override
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
- lsmTree = lsmInitialState.getLsm();
cmp = lsmInitialState.getCmp();
int numBTrees = lsmInitialState.getNumBTrees();
rangeCursors = new BTreeRangeSearchCursor[numBTrees];
@@ -100,7 +97,7 @@
rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
}
includeMemBTree = lsmInitialState.getIncludeMemBTree();
- searcherRefCount = lsmInitialState.getSearcherRefCount();
+ lsmHarness = lsmInitialState.getLSMHarness();
setPriorityQueueComparator();
}
@@ -118,22 +115,14 @@
@Override
public void close() throws HyracksDataException {
- outputPriorityQueue.clear();
- for (int i = 0; i < rangeCursors.length; i++) {
- rangeCursors[i].close();
- }
- rangeCursors = null;
- // If the in-memory BTree was not included in the search, then we don't
- // need to synchronize with a flush.
- if (includeMemBTree) {
- try {
- lsmTree.threadExit();
- } catch (TreeIndexException e) {
- throw new HyracksDataException(e);
+ try {
+ outputPriorityQueue.clear();
+ for (int i = 0; i < rangeCursors.length; i++) {
+ rangeCursors[i].close();
}
- } else {
- // Synchronize with ongoing merges.
- searcherRefCount.decrementAndGet();
+ rangeCursors = null;
+ } finally {
+ lsmHarness.closeSearchCursor(includeMemBTree);
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
index 94dc6b3..69068e2 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
@@ -15,12 +15,46 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+/**
+ * Methods to be implemented by an LSM index, which are called from LSMHarness.
+ * The implementations of the methods below should be thread agnostic.
+ * Synchronization of LSM operations like updates/searches/flushes/merges are
+ * done by the LSMHarness. For example, a flush() implementation should only
+ * create and return the new on-disk component, ignoring the fact that
+ * concurrent searches/updates/merges may be ongoing.
+ *
+ */
public interface ILSMTree extends ITreeIndex {
- public void merge() throws HyracksDataException, TreeIndexException;
+ public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
+ TreeIndexException;
- public void flush() throws HyracksDataException, TreeIndexException;
+ public void search(ITreeIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred,
+ IIndexOpContext ictx, boolean includeMemComponent) throws HyracksDataException, TreeIndexException;
+
+ public Object merge(List<Object> mergedComponents) throws HyracksDataException, TreeIndexException;
+
+ public void addMergedComponent(Object newComponent, List<Object> mergedComponents);
+
+ public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException;
+
+ public Object flush() throws HyracksDataException, TreeIndexException;
+
+ public void addFlushedComponent(Object index);
+
+ public InMemoryFreePageManager getInMemoryFreePageManager();
+
+ public void resetInMemoryComponent() throws HyracksDataException;
+
+ public List<Object> getDiskComponents();
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTreeIndexAccessor.java
new file mode 100644
index 0000000..0289dd6
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTreeIndexAccessor.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+
+/**
+ * Client handle for performing operations
+ * (insert/delete/update/search/diskorderscan/merge/flush) on an ILSMTree. An
+ * ILSMTreeIndexAccessor is not thread safe, but different ILSMTreeIndexAccessor
+ * can concurrently operate on the same ILSMTree (i.e., the ILSMTree must allow
+ * concurrent operations).
+ */
+public interface ILSMTreeIndexAccessor extends ITreeIndexAccessor {
+ /**
+ * Force a flush of the in-memory component.
+ *
+ * @throws HyracksDataException
+ * @throws TreeIndexException
+ */
+ public void flush() throws HyracksDataException, TreeIndexException;
+
+ /**
+ * Merge all on-disk components.
+ *
+ * @throws HyracksDataException
+ * @throws TreeIndexException
+ */
+ public void merge() throws HyracksDataException, TreeIndexException;
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
new file mode 100644
index 0000000..1954fd2
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
+
+/**
+ * Common code for synchronizing LSM operations like
+ * updates/searches/flushes/merges on any ILSMTree. This class only deals with
+ * synchronizing LSM operations, and delegates the concrete implementations of
+ * actual operations to ILSMTree (passed in the c'tor).
+ *
+ */
+public class LSMHarness {
+ protected final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
+ protected static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
+
+ private ILSMTree lsmTree;
+
+ // All accesses to the LSM-Tree's on-disk components are synchronized on diskComponentsSync.
+ private Object diskComponentsSync = new Object();
+
+ // For synchronizing all operations with flushes.
+ // Currently, all operations block during a flush.
+ private int threadRefCount;
+ private boolean flushFlag;
+
+ // For synchronizing searchers with a concurrent merge.
+ private AtomicBoolean isMerging = new AtomicBoolean(false);
+ private AtomicInteger searcherRefCountA = new AtomicInteger(0);
+ private AtomicInteger searcherRefCountB = new AtomicInteger(0);
+ // Represents the current number of searcher threads that are operating on
+ // the unmerged on-disk BTrees.
+ // We alternate between searcherRefCountA and searcherRefCountB.
+ private AtomicInteger searcherRefCount = searcherRefCountA;
+
+ public LSMHarness(ILSMTree lsmTree) {
+ this.lsmTree = lsmTree;
+ this.threadRefCount = 0;
+ this.flushFlag = false;
+ }
+
+ public void threadEnter() {
+ threadRefCount++;
+ }
+
+ public void threadExit() throws HyracksDataException, TreeIndexException {
+ synchronized (this) {
+ threadRefCount--;
+ // Check if we've reached or exceeded the maximum number of pages.
+ if (!flushFlag && lsmTree.getInMemoryFreePageManager().isFull()) {
+ flushFlag = true;
+ }
+ // Flush will only be handled by last exiting thread.
+ if (flushFlag && threadRefCount == 0) {
+ flush();
+ flushFlag = false;
+ }
+ }
+ }
+
+ public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ctx) throws HyracksDataException, TreeIndexException {
+ boolean waitForFlush = true;
+ do {
+ // Wait for ongoing flush to complete.
+ synchronized (this) {
+ if (!flushFlag) {
+ // Increments threadRefCount, to force a flush to wait for this operation to finish.
+ // (a flush can only begin once threadRefCount == 0).
+ threadEnter();
+ // Proceed with operation.
+ waitForFlush = false;
+ }
+ }
+ } while (waitForFlush);
+ try {
+ lsmTree.insertUpdateOrDelete(tuple, ctx);
+ } finally {
+ threadExit();
+ }
+ }
+
+ public void flush() throws HyracksDataException, TreeIndexException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Flushing LSM-Tree.");
+ }
+ Object newDiskComponent = lsmTree.flush();
+ lsmTree.resetInMemoryComponent();
+ synchronized (diskComponentsSync) {
+ lsmTree.addFlushedComponent(newDiskComponent);
+ }
+ }
+
+ public List<Object> search(ITreeIndexCursor cursor, ISearchPredicate pred, IIndexOpContext ctx, boolean includeMemComponent) throws HyracksDataException, TreeIndexException {
+ // If the search doesn't include the in-memory component, then we don't have
+ // to synchronize with a flush.
+ if (includeMemComponent) {
+ boolean waitForFlush = true;
+ do {
+ synchronized (this) {
+ if (!flushFlag) {
+ // The corresponding threadExit() is in
+ // LSMTreeRangeSearchCursor.close().
+ threadEnter();
+ waitForFlush = false;
+ }
+ }
+ } while (waitForFlush);
+ }
+
+ // Get a snapshot of the current on-disk BTrees.
+ // If includeMemBTree is true, then no concurrent
+ // flush can add another on-disk BTree (due to threadEnter());
+ // If includeMemBTree is false, then it is possible that a concurrent
+ // flush adds another on-disk BTree.
+ // Since this mode is only used for merging trees, it doesn't really
+ // matter if the merge excludes the new on-disk BTree.
+ List<Object> diskComponentSnapshot = new ArrayList<Object>();
+ AtomicInteger localSearcherRefCount = null;
+ synchronized (diskComponentsSync) {
+ diskComponentSnapshot.addAll(lsmTree.getDiskComponents());
+ // Only remember the search ref count when performing a merge (i.e., includeMemComponent is false).
+ if (!includeMemComponent) {
+ localSearcherRefCount = searcherRefCount;
+ localSearcherRefCount.incrementAndGet();
+ }
+ }
+
+ lsmTree.search(cursor, diskComponentSnapshot, pred, ctx, includeMemComponent);
+ return diskComponentSnapshot;
+ }
+
+ public void merge() throws HyracksDataException, TreeIndexException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Merging LSM-BTree.");
+ }
+ if (!isMerging.compareAndSet(false, true)) {
+ throw new TreeIndexException("Merge already in progress in LSMBTree. Only one concurrent merge allowed.");
+ }
+
+ // Point to the current searcher ref count, so we can wait for it later
+ // (after we swap the searcher ref count).
+ AtomicInteger localSearcherRefCount = searcherRefCount;
+
+ List<Object> mergedComponents = new ArrayList<Object>();
+ Object newComponent = lsmTree.merge(mergedComponents);
+
+ // Remove the old BTrees from the list, and add the new merged BTree.
+ // Also, swap the searchRefCount.
+ synchronized (diskComponentsSync) {
+ lsmTree.addMergedComponent(newComponent, mergedComponents);
+ // Swap the searcher ref count reference, and reset it to zero.
+ if (searcherRefCount == searcherRefCountA) {
+ searcherRefCount = searcherRefCountB;
+ } else {
+ searcherRefCount = searcherRefCountA;
+ }
+ searcherRefCount.set(0);
+ }
+
+ // Wait for all searchers that are still accessing the old on-disk
+ // BTrees, then perform the final cleanup of the old BTrees.
+ while (localSearcherRefCount.get() != 0) {
+ try {
+ Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
+ } catch (InterruptedException e) {
+ // Propagate the exception to the caller, so that an appropriate
+ // cleanup action can be taken.
+ throw new HyracksDataException(e);
+ }
+ }
+
+ // Cleanup. At this point we have guaranteed that no searchers are
+ // touching the old on-disk BTrees (localSearcherRefCount == 0).
+ lsmTree.cleanUpAfterMerge(mergedComponents);
+ isMerging.set(false);
+ }
+
+ public AtomicInteger getSearcherRefCount() {
+ return searcherRefCount;
+ }
+
+ public void closeSearchCursor(boolean includeMemComponent) throws HyracksDataException {
+ // If the in-memory BTree was not included in the search, then we don't
+ // need to synchronize with a flush.
+ if (includeMemComponent) {
+ try {
+ threadExit();
+ } catch (TreeIndexException e) {
+ throw new HyracksDataException(e);
+ }
+ } else {
+ // Synchronize with ongoing merges.
+ searcherRefCount.decrementAndGet();
+ }
+ }
+
+ public void addBulkLoadedComponent(Object index) {
+ synchronized (diskComponentsSync) {
+ lsmTree.addFlushedComponent(index);
+ }
+ }
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTree.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTree.java
deleted file mode 100644
index ca780b7..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTree.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.io.File;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileNameManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-
-public abstract class LSMTree implements ILSMTree {
- protected static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
-
- // In-memory components.
- protected final BTree memBTree;
- protected final InMemoryFreePageManager memFreePageManager;
-
- // On-disk components.
- protected final ILSMFileNameManager fileNameManager;
- // For creating BTree's used in flush and merge.
- protected final BTreeFactory diskBTreeFactory;
-
- protected final IBufferCache diskBufferCache;
- protected final IFileMapProvider diskFileMapProvider;
- protected LinkedList<BTree> diskBTrees = new LinkedList<BTree>();
-
- protected final MultiComparator cmp;
-
- // For synchronizing all operations with flushes.
- // Currently, all operations block during a flush.
- private int threadRefCount;
- protected boolean flushFlag;
-
- // For synchronizing searchers with a concurrent merge.
- protected AtomicBoolean isMerging = new AtomicBoolean(false);
- protected AtomicInteger searcherRefCountA = new AtomicInteger(0);
- protected AtomicInteger searcherRefCountB = new AtomicInteger(0);
- // Represents the current number of searcher threads that are operating on
- // the unmerged on-disk BTrees.
- // We alternate between searcherRefCountA and searcherRefCountB.
- protected AtomicInteger searcherRefCount = searcherRefCountA;
-
- public LSMTree(IBufferCache memBufferCache, InMemoryFreePageManager memFreePageManager,
- ITreeIndexFrameFactory btreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
- ILSMFileNameManager fileNameManager, BTreeFactory diskBTreeFactory, IFileMapProvider diskFileMapProvider,
- int fieldCount, MultiComparator cmp) {
- memBTree = new BTree(memBufferCache, fieldCount, cmp, memFreePageManager, btreeInteriorFrameFactory,
- btreeLeafFrameFactory);
- this.memFreePageManager = memFreePageManager;
- this.diskBufferCache = diskBTreeFactory.getBufferCache();
- this.diskFileMapProvider = diskFileMapProvider;
- this.diskBTreeFactory = diskBTreeFactory;
- this.cmp = cmp;
- this.diskBTrees = new LinkedList<BTree>();
- this.threadRefCount = 0;
- this.flushFlag = false;
- this.fileNameManager = fileNameManager;
- }
-
- @Override
- public void create(int indexFileId) throws HyracksDataException {
- memBTree.create(indexFileId);
- }
-
- @Override
- public void close() throws HyracksDataException {
- for (BTree btree : diskBTrees) {
- diskBufferCache.closeFile(btree.getFileId());
- btree.close();
- }
- diskBTrees.clear();
- memBTree.close();
- }
-
- public void threadEnter() {
- threadRefCount++;
- }
-
- public void threadExit() throws HyracksDataException, TreeIndexException {
- synchronized (this) {
- threadRefCount--;
- // Check if we've reached or exceeded the maximum number of pages.
- if (!flushFlag && memFreePageManager.isFull()) {
- flushFlag = true;
- }
- // Flush will only be handled by last exiting thread.
- if (flushFlag && threadRefCount == 0) {
- flush();
- flushFlag = false;
- }
- }
- }
-
- protected void cleanupTrees(List<ITreeIndex> mergingDiskTrees) throws HyracksDataException {
- for (ITreeIndex oldTree : mergingDiskTrees) {
- FileReference fileRef = diskFileMapProvider.lookupFileName(oldTree.getFileId());
- diskBufferCache.closeFile(oldTree.getFileId());
- oldTree.close();
- fileRef.getFile().delete();
- }
- }
-
- protected void resetMemBTree() throws HyracksDataException {
- memFreePageManager.reset();
- memBTree.create(memBTree.getFileId());
- }
-
- protected ITreeIndex createFlushTargetTree(String fileName) throws HyracksDataException {
- return createDiskTree(diskBTreeFactory, fileName, true);
- }
-
- protected ITreeIndex createMergeTargetTree(String fileName) throws HyracksDataException {
- return createDiskTree(diskBTreeFactory, fileName, true);
- }
-
- protected ITreeIndex createDiskTree(TreeFactory diskTreeFactory, String fileName, boolean createTree)
- throws HyracksDataException {
- // Register the new tree file.
- FileReference file = new FileReference(new File(fileName));
- // File will be deleted during cleanup of merge().
- diskBufferCache.createFile(file);
- int diskTreeFileId = diskFileMapProvider.lookupFileId(file);
- // File will be closed during cleanup of merge().
- diskBufferCache.openFile(diskTreeFileId);
- // Create new tree instance.
- ITreeIndex diskTree = diskTreeFactory.createIndexInstance(diskTreeFileId);
- if (createTree) {
- diskTree.create(diskTreeFileId);
- }
- // Tree will be closed during cleanup of merge().
- diskTree.open(diskTreeFileId);
- return diskTree;
- }
-
- @Override
- public abstract void flush() throws HyracksDataException, TreeIndexException;
-
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
new file mode 100644
index 0000000..6d9cba3
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTreeIndexAccessor;
+
+public abstract class LSMTreeIndexAccessor implements ILSMTreeIndexAccessor {
+ private LSMHarness lsmHarness;
+ private IIndexOpContext ctx;
+
+ public LSMTreeIndexAccessor(LSMHarness lsmHarness, IIndexOpContext ctx) {
+ this.lsmHarness = lsmHarness;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void insert(ITupleReference tuple) throws HyracksDataException,
+ TreeIndexException {
+ ctx.reset(IndexOp.INSERT);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ }
+
+ @Override
+ public void update(ITupleReference tuple) throws HyracksDataException,
+ TreeIndexException {
+ // Update is the same as insert.
+ ctx.reset(IndexOp.INSERT);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ }
+
+ @Override
+ public void delete(ITupleReference tuple) throws HyracksDataException,
+ TreeIndexException {
+ ctx.reset(IndexOp.DELETE);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ }
+
+ @Override
+ public void search(ITreeIndexCursor cursor, ISearchPredicate searchPred)
+ throws HyracksDataException, TreeIndexException {
+ ctx.reset(IndexOp.SEARCH);
+ lsmHarness.search(cursor, searchPred, ctx, true);
+ }
+
+ @Override
+ public ITreeIndexCursor createDiskOrderScanCursor() {
+ // Disk-order scan doesn't make sense for the LSMBTree because it cannot
+ // correctly resolve deleted tuples.
+ throw new UnsupportedOperationException(
+ "DiskOrderScan not supported by LSMTree.");
+ }
+
+ @Override
+ public void diskOrderScan(ITreeIndexCursor cursor)
+ throws HyracksDataException {
+ // Disk-order scan doesn't make sense for the LSMBTree because it cannot
+ // correctly resolve deleted tuples.
+ throw new UnsupportedOperationException(
+ "DiskOrderScan not supported by LSMTree.");
+ }
+
+ @Override
+ public void flush() throws HyracksDataException, TreeIndexException {
+ lsmHarness.flush();
+ }
+
+ @Override
+ public void merge() throws HyracksDataException, TreeIndexException {
+ lsmHarness.merge();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 8070a8e..57cb36c 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -17,20 +17,20 @@
import java.io.File;
import java.io.FilenameFilter;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
-import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
@@ -41,9 +41,12 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileNameManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTree;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeFactory;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
@@ -51,18 +54,47 @@
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-public class LSMRTree extends LSMTree {
-
+public class LSMRTree implements ILSMTree {
+
+ public class LSMRTreeComponent {
+ private final RTree rtree;
+ private final BTree btree;
+
+ LSMRTreeComponent (RTree rtree, BTree btree) {
+ this.rtree = rtree;
+ this.btree = btree;
+ }
+
+ public RTree getRTree() {
+ return rtree;
+ }
+
+ public BTree getBTree() {
+ return btree;
+ }
+ }
+
+ private final LSMHarness lsmHarness;
+
// In-memory components.
- private final RTree memRTree;
+ private final LSMRTreeComponent memComponent;
+ protected final InMemoryFreePageManager memFreePageManager;
private final static int MEM_RTREE_FILE_ID = 0;
private final static int MEM_BTREE_FILE_ID = 1;
// On-disk components.
+ private final ILSMFileNameManager fileNameManager;
+ protected final IBufferCache diskBufferCache;
+ protected final IFileMapProvider diskFileMapProvider;
// For creating RTree's used in flush and merge.
private final RTreeFactory diskRTreeFactory;
- private LinkedList<RTree> diskRTrees = new LinkedList<RTree>();
+ // For creating BTree's used in flush and merge.
+ private final BTreeFactory diskBTreeFactory;
+ // List of LSMRTreeComponent instances. Using Object for better sharing via ILSMTree + LSMHarness.
+ private final LinkedList<Object> diskComponents = new LinkedList<Object>();
+ private MultiComparator btreeCmp;
+
// Common for in-memory and on-disk components.
private final ITreeIndexFrameFactory rtreeInteriorFrameFactory;
private final ITreeIndexFrameFactory btreeInteriorFrameFactory;
@@ -74,24 +106,29 @@
ITreeIndexFrameFactory btreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
ILSMFileNameManager fileNameManager, RTreeFactory diskRTreeFactory, BTreeFactory diskBTreeFactory,
IFileMapProvider diskFileMapProvider, int fieldCount, MultiComparator rtreeCmp, MultiComparator btreeCmp) {
-
- super(memBufferCache, memFreePageManager, btreeInteriorFrameFactory, btreeLeafFrameFactory, fileNameManager,
- diskBTreeFactory, diskFileMapProvider, fieldCount, btreeCmp);
- memRTree = new RTree(memBufferCache, fieldCount, rtreeCmp, memFreePageManager, rtreeInteriorFrameFactory,
+ RTree memRTree = new RTree(memBufferCache, fieldCount, rtreeCmp, memFreePageManager, rtreeInteriorFrameFactory,
rtreeLeafFrameFactory);
-
+ BTree memBTree = new BTree(memBufferCache, fieldCount, btreeCmp, memFreePageManager, btreeInteriorFrameFactory,
+ btreeLeafFrameFactory);
+ memComponent = new LSMRTreeComponent(memRTree, memBTree);
+ this.memFreePageManager = memFreePageManager;
+ this.diskBufferCache = diskBTreeFactory.getBufferCache();
+ this.diskFileMapProvider = diskFileMapProvider;
+ this.diskBTreeFactory = diskBTreeFactory;
+ this.fileNameManager = fileNameManager;
this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory;
this.rtreeLeafFrameFactory = rtreeLeafFrameFactory;
this.btreeInteriorFrameFactory = btreeInteriorFrameFactory;
this.btreeLeafFrameFactory = btreeLeafFrameFactory;
-
this.diskRTreeFactory = diskRTreeFactory;
+ this.btreeCmp = btreeCmp;
+ this.lsmHarness = new LSMHarness(this);
}
@Override
public void create(int indexFileId) throws HyracksDataException {
- super.create(MEM_BTREE_FILE_ID);
- memRTree.create(MEM_RTREE_FILE_ID);
+ memComponent.getRTree().create(MEM_RTREE_FILE_ID);
+ memComponent.getBTree().create(MEM_BTREE_FILE_ID);
}
/**
@@ -112,8 +149,8 @@
*/
@Override
public void open(int indexFileId) throws HyracksDataException {
- memRTree.open(MEM_RTREE_FILE_ID);
- memBTree.open(MEM_BTREE_FILE_ID);
+ memComponent.getRTree().open(MEM_RTREE_FILE_ID);
+ memComponent.getBTree().open(MEM_BTREE_FILE_ID);
File dir = new File(fileNameManager.getBaseDir());
FilenameFilter rtreeFilter = new FilenameFilter() {
public boolean accept(File dir, String name) {
@@ -135,34 +172,52 @@
Comparator<String> fileNameCmp = fileNameManager.getFileNameComparator();
Arrays.sort(rtreeFiles, fileNameCmp);
- for (String fileName : rtreeFiles) {
- RTree rtree = (RTree) createDiskTree(diskRTreeFactory, fileName, false);
- diskRTrees.add(rtree);
- }
-
Arrays.sort(btreeFiles, fileNameCmp);
- for (String fileName : btreeFiles) {
- BTree btree = (BTree) createDiskTree(diskBTreeFactory, fileName, false);
- diskBTrees.add(btree);
+ // Assert rtreeFiles.size() == btreeFiles.size()
+ for (int i = 0; i < rtreeFiles.length; i++) {
+ RTree rtree = (RTree) createDiskTree(diskRTreeFactory, rtreeFiles[i], false);
+ BTree btree = (BTree) createDiskTree(diskBTreeFactory, btreeFiles[i], false);
+ LSMRTreeComponent diskComponent = new LSMRTreeComponent(rtree, btree);
+ diskComponents.add(diskComponent);
}
}
@Override
public void close() throws HyracksDataException {
- super.close();
- for (RTree rtree : diskRTrees) {
+ for (Object o : diskComponents) {
+ LSMRTreeComponent diskComponent = (LSMRTreeComponent) o;
+ RTree rtree = diskComponent.getRTree();
+ BTree btree = diskComponent.getBTree();
diskBufferCache.closeFile(rtree.getFileId());
rtree.close();
+ diskBufferCache.closeFile(btree.getFileId());
+ btree.close();
}
- diskRTrees.clear();
- memRTree.close();
+ diskComponents.clear();
+ memComponent.getRTree().close();
+ memComponent.getBTree().close();
}
- @Override
- public ITreeIndexAccessor createAccessor() {
- return new LSMRTreeAccessor(this);
+ // TODO: Candidate for more code sharing.
+ protected ITreeIndex createDiskTree(TreeFactory diskTreeFactory, String fileName, boolean createTree)
+ throws HyracksDataException {
+ // Register the new tree file.
+ FileReference file = new FileReference(new File(fileName));
+ // File will be deleted during cleanup of merge().
+ diskBufferCache.createFile(file);
+ int diskTreeFileId = diskFileMapProvider.lookupFileId(file);
+ // File will be closed during cleanup of merge().
+ diskBufferCache.openFile(diskTreeFileId);
+ // Create new tree instance.
+ ITreeIndex diskTree = diskTreeFactory.createIndexInstance(diskTreeFileId);
+ if (createTree) {
+ diskTree.create(diskTreeFileId);
+ }
+ // Tree will be closed during cleanup of merge().
+ diskTree.open(diskTreeFileId);
+ return diskTree;
}
-
+
@Override
public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws TreeIndexException, HyracksDataException {
// Note that by using a flush target file name, we state that the new
@@ -189,10 +244,8 @@
public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
LSMRTreeBulkLoadContext bulkLoadCtx = (LSMRTreeBulkLoadContext) ictx;
bulkLoadCtx.getRTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());
- synchronized (diskRTrees) {
- diskRTrees.addFirst(bulkLoadCtx.getRTree());
- diskBTrees.addFirst(bulkLoadCtx.getBTree());
- }
+ LSMRTreeComponent diskComponent = new LSMRTreeComponent(bulkLoadCtx.getRTree(), bulkLoadCtx.getBTree());
+ lsmHarness.addBulkLoadedComponent(diskComponent);
}
@Override
@@ -230,82 +283,24 @@
return 0;
}
- private void insertOrDelete(ITupleReference tuple, ITreeIndexAccessor accessor) throws HyracksDataException,
+ public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
TreeIndexException {
- boolean waitForFlush = true;
- do {
- // Wait for ongoing flush to complete.
- synchronized (this) {
- if (!flushFlag) {
- // Increments threadRefCount, to force a flush to wait for
- // this operation to finish.
- // (a flush can only begin once threadRefCount == 0).
- threadEnter();
- // Proceed with operation.
- waitForFlush = false;
- }
- }
- } while (waitForFlush);
- accessor.insert(tuple);
- try {
- threadExit();
- } catch (Exception e) {
- e.printStackTrace();
+ LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
+ if (ctx.getIndexOp() == IndexOp.INSERT) {
+ ctx.memRTreeAccessor.insert(tuple);
+ } else {
+ // Assert ctx.getIndexOp() == IndexOp.DELETE
+ ctx.memBTreeAccessor.insert(tuple);
}
}
- private void insert(ITupleReference tuple, LSMRTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
- insertOrDelete(tuple, ctx.memRTreeAccessor);
- }
-
- private void delete(ITupleReference tuple, LSMRTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
- insertOrDelete(tuple, ctx.memBTreeAccessor);
- }
-
- private Pair<List<ITreeIndex>, List<ITreeIndex>> search(ITreeIndexCursor cursor, ISearchPredicate rtreeSearchPred,
- LSMRTreeOpContext ctx, boolean includeMemRTree) throws HyracksDataException, TreeIndexException {
- // If the search doesn't include the in-memory RTree, then we don't have
- // to synchronize with a flush.
- if (includeMemRTree) {
- boolean waitForFlush = true;
- do {
- synchronized (this) {
- if (!flushFlag) {
- // The corresponding threadExit() is in
- // LSMTreeRangeSearchCursor.close().
- threadEnter();
- waitForFlush = false;
- }
- }
- } while (waitForFlush);
- }
-
- // Get a snapshot of the current on-disk RTrees and BTrees.
- // If includeMemRTree is true, then no concurrent
- // flush can add another on-disk RTree (due to threadEnter());
- // If includeMemRTree is false, then it is possible that a concurrent
- // flush adds another on-disk RTree.
- // Since this mode is only used for merging trees, it doesn't really
- // matter if the merge excludes the new on-disk RTree.
- List<ITreeIndex> diskRTreesSnapshot = new ArrayList<ITreeIndex>();
- List<ITreeIndex> diskBTreesSnapshot = new ArrayList<ITreeIndex>();
- AtomicInteger localSearcherRefCount = null;
- synchronized (diskRTrees) {
- diskRTreesSnapshot.addAll(diskRTrees);
- diskBTreesSnapshot.addAll(diskBTrees);
- // Only remember the search ref count when performing a merge (i.e.,
- // includeMemRTree is false).
- if (!includeMemRTree) {
- localSearcherRefCount = searcherRefCount;
- localSearcherRefCount.incrementAndGet();
- }
- }
-
- int numDiskTrees = diskRTreesSnapshot.size();
-
+ public void search(ITreeIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred,
+ IIndexOpContext ictx, boolean includeMemComponent) throws HyracksDataException, TreeIndexException {
+ LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
+ int numDiskTrees = diskComponents.size();
ITreeIndexAccessor[] bTreeAccessors;
int diskBTreeIx = 0;
- if (includeMemRTree) {
+ if (includeMemComponent) {
bTreeAccessors = new ITreeIndexAccessor[numDiskTrees + 1];
bTreeAccessors[0] = ctx.memBTreeAccessor;
diskBTreeIx++;
@@ -313,22 +308,23 @@
bTreeAccessors = new ITreeIndexAccessor[numDiskTrees];
}
- ListIterator<ITreeIndex> diskBTreesIter = diskBTreesSnapshot.listIterator();
+ ListIterator<Object> diskBTreesIter = diskComponents.listIterator();
while (diskBTreesIter.hasNext()) {
- BTree diskBTree = (BTree) diskBTreesIter.next();
+ LSMRTreeComponent component = (LSMRTreeComponent) diskBTreesIter.next();
+ BTree diskBTree = component.getBTree();
bTreeAccessors[diskBTreeIx] = diskBTree.createAccessor();
diskBTreeIx++;
}
LSMRTreeSearchCursor lsmRTreeCursor = (LSMRTreeSearchCursor) cursor;
LSMRTreeCursorInitialState initialState = new LSMRTreeCursorInitialState(numDiskTrees + 1,
- rtreeLeafFrameFactory, rtreeInteriorFrameFactory, btreeLeafFrameFactory, cmp, bTreeAccessors, this,
- includeMemRTree, localSearcherRefCount);
- lsmRTreeCursor.open(initialState, rtreeSearchPred);
+ rtreeLeafFrameFactory, rtreeInteriorFrameFactory, btreeLeafFrameFactory, btreeCmp, bTreeAccessors,
+ includeMemComponent, lsmHarness);
+ lsmRTreeCursor.open(initialState, pred);
int cursorIx = 1;
- if (includeMemRTree) {
- ctx.memRTreeAccessor.search(((LSMRTreeSearchCursor) lsmRTreeCursor).getCursor(0), rtreeSearchPred);
+ if (includeMemComponent) {
+ ctx.memRTreeAccessor.search(((LSMRTreeSearchCursor) lsmRTreeCursor).getCursor(0), pred);
cursorIx = 1;
} else {
cursorIx = 0;
@@ -336,25 +332,23 @@
// Open cursors of on-disk RTrees
ITreeIndexAccessor[] diskRTreeAccessors = new ITreeIndexAccessor[numDiskTrees];
- ListIterator<ITreeIndex> diskRTreesIter = diskRTreesSnapshot.listIterator();
+ ListIterator<Object> diskRTreesIter = diskComponents.listIterator();
int diskRTreeIx = 0;
while (diskRTreesIter.hasNext()) {
- RTree diskRTree = (RTree) diskRTreesIter.next();
+ LSMRTreeComponent component = (LSMRTreeComponent) diskRTreesIter.next();
+ RTree diskRTree = component.getRTree();
diskRTreeAccessors[diskRTreeIx] = diskRTree.createAccessor();
- diskRTreeAccessors[diskRTreeIx].search(lsmRTreeCursor.getCursor(cursorIx), rtreeSearchPred);
+ diskRTreeAccessors[diskRTreeIx].search(lsmRTreeCursor.getCursor(cursorIx), pred);
cursorIx++;
diskRTreeIx++;
}
- return new Pair<List<ITreeIndex>, List<ITreeIndex>>(diskRTreesSnapshot, diskBTreesSnapshot);
-
}
@Override
- public void flush() throws HyracksDataException, TreeIndexException {
-
+ public Object flush() throws HyracksDataException, TreeIndexException {
// scan the memory RTree
- ITreeIndexAccessor memRTreeAccessor = memRTree.createAccessor();
+ ITreeIndexAccessor memRTreeAccessor = memComponent.getRTree().createAccessor();
ITreeIndexCursor rtreeScanCursor = memRTreeAccessor.createSearchCursor();
SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
@@ -377,7 +371,7 @@
diskRTree.endBulkLoad(rtreeBulkLoadCtx);
// scan the memory BTree
- ITreeIndexAccessor memBTreeAccessor = memBTree.createAccessor();
+ ITreeIndexAccessor memBTreeAccessor = memComponent.getBTree().createAccessor();
ITreeIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor();
RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
@@ -396,32 +390,17 @@
btreeScanCursor.close();
}
diskBTree.endBulkLoad(btreeBulkLoadCtx);
-
- resetMemoryTrees();
-
- synchronized (diskRTrees) {
- diskRTrees.addFirst(diskRTree);
- diskBTrees.addFirst(diskBTree);
- }
+ return new LSMRTreeComponent(diskRTree, diskBTree);
}
@Override
- public void merge() throws HyracksDataException, TreeIndexException {
- if (!isMerging.compareAndSet(false, true)) {
- throw new TreeIndexException("Merge already in progress in LSMRTree. Only one concurrent merge allowed.");
- }
-
- // Point to the current searcher ref count, so we can wait for it later
- // (after we swap the searcher ref count).
- AtomicInteger localSearcherRefCount = searcherRefCount;
-
- LSMRTreeOpContext ctx = createOpContext();
+ public Object merge(List<Object> mergedComponents) throws HyracksDataException, TreeIndexException {
+ IIndexOpContext ctx = createOpContext();
ITreeIndexCursor cursor = new LSMRTreeSearchCursor();
- SearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+ ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
// Scan the RTrees, ignoring the in-memory RTree.
- Pair<List<ITreeIndex>, List<ITreeIndex>> mergingDiskTreesPair = search(cursor, rtreeSearchPred, ctx, false);
- List<ITreeIndex> mergingDiskRTrees = mergingDiskTreesPair.getFirst();
- List<ITreeIndex> mergingDiskBTrees = mergingDiskTreesPair.getSecond();
+ List<Object> mergingComponents = lsmHarness.search(cursor, rtreeSearchPred, ctx, false);
+ mergedComponents.addAll(mergingComponents);
// Bulk load the tuples from all on-disk RTrees into the new RTree.
String fileName = fileNameManager.getMergeFileName();
@@ -439,115 +418,76 @@
cursor.close();
}
mergedRTree.endBulkLoad(bulkLoadCtx);
-
- // Remove the old RTrees and BTrees from the list, and add the new
- // merged RTree and an empty BTree
- // Also, swap the searchRefCount.
- synchronized (diskRTrees) {
- diskRTrees.removeAll(mergingDiskRTrees);
- diskRTrees.addLast(mergedRTree);
-
- diskBTrees.removeAll(mergingDiskBTrees);
- diskBTrees.addLast(mergedBTree);
- // Swap the searcher ref count reference, and reset it to zero.
- if (searcherRefCount == searcherRefCountA) {
- searcherRefCount = searcherRefCountB;
- } else {
- searcherRefCount = searcherRefCountA;
- }
- searcherRefCount.set(0);
- }
-
- // Wait for all searchers that are still accessing the old on-disk
- // RTrees and BTrees, then perform the final cleanup of the old RTrees
- // and BTrees.
- while (localSearcherRefCount.get() != 0) {
- try {
- Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
- } catch (InterruptedException e) {
- // Propagate the exception to the caller, so that an appropriate
- // cleanup action can be taken.
- throw new HyracksDataException(e);
- }
- }
-
- // Cleanup. At this point we have guaranteed that no searchers are
- // touching the old on-disk RTrees and BTrees (localSearcherRefCount ==
- // 0).
- cleanupTrees(mergingDiskRTrees);
- cleanupTrees(mergingDiskBTrees);
- isMerging.set(false);
-
+ return new LSMRTreeComponent(mergedRTree, mergedBTree);
}
- public void resetMemoryTrees() throws HyracksDataException {
- resetMemBTree();
- memRTree.create(MEM_RTREE_FILE_ID);
+ @Override
+ public void addMergedComponent(Object newComponent, List<Object> mergedComponents) {
+ diskComponents.removeAll(mergedComponents);
+ diskComponents.addLast((LSMRTreeComponent) newComponent);
}
+ @Override
+ public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException {
+ for (Object o : mergedComponents) {
+ LSMRTreeComponent component = (LSMRTreeComponent) o;
+ BTree oldBTree = component.getBTree();
+ FileReference btreeFileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
+ diskBufferCache.closeFile(oldBTree.getFileId());
+ oldBTree.close();
+ btreeFileRef.getFile().delete();
+ RTree oldRTree = component.getRTree();
+ FileReference rtreeFileRef = diskFileMapProvider.lookupFileName(oldRTree.getFileId());
+ diskBufferCache.closeFile(oldRTree.getFileId());
+ oldRTree.close();
+ rtreeFileRef.getFile().delete();
+ }
+ }
+
+ @Override
+ public void addFlushedComponent(Object index) {
+ diskComponents.addFirst((LSMRTreeComponent) index);
+ }
+
+ @Override
+ public InMemoryFreePageManager getInMemoryFreePageManager() {
+ return memFreePageManager;
+ }
+
+ @Override
+ public void resetInMemoryComponent() throws HyracksDataException {
+ memComponent.getRTree().create(MEM_RTREE_FILE_ID);
+ memComponent.getBTree().create(MEM_BTREE_FILE_ID);
+ memFreePageManager.reset();
+ }
+
+ @Override
+ public List<Object> getDiskComponents() {
+ return diskComponents;
+ }
+
protected LSMRTreeOpContext createOpContext() {
-
- return new LSMRTreeOpContext((RTree.RTreeAccessor) memRTree.createAccessor(),
+ return new LSMRTreeOpContext((RTree.RTreeAccessor) memComponent.getRTree().createAccessor(),
(IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame(),
(IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(), memFreePageManager
- .getMetaDataFrameFactory().createFrame(), 8, (BTree.BTreeAccessor) memBTree.createAccessor(),
+ .getMetaDataFrameFactory().createFrame(), 8, (BTree.BTreeAccessor) memComponent.getBTree().createAccessor(),
btreeLeafFrameFactory, btreeInteriorFrameFactory, memFreePageManager.getMetaDataFrameFactory()
- .createFrame(), cmp);
+ .createFrame(), btreeCmp);
}
-
- private class LSMRTreeAccessor implements ITreeIndexAccessor {
- private LSMRTree lsmRTree;
- private LSMRTreeOpContext ctx;
-
- public LSMRTreeAccessor(LSMRTree lsmRTree) {
- this.lsmRTree = lsmRTree;
- this.ctx = lsmRTree.createOpContext();
-
- }
-
- @Override
- public void insert(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
- ctx.reset(IndexOp.INSERT);
- lsmRTree.insert(tuple, ctx);
- }
-
- @Override
- public void update(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
- throw new UnsupportedOperationException("Update not supported by LSMRTree");
- }
-
- @Override
- public void delete(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
- ctx.reset(IndexOp.DELETE);
- lsmRTree.delete(tuple, ctx);
+
+ @Override
+ public ITreeIndexAccessor createAccessor() {
+ return new LSMRTreeAccessor(lsmHarness, createOpContext());
+ }
+
+ private class LSMRTreeAccessor extends LSMTreeIndexAccessor {
+ public LSMRTreeAccessor(LSMHarness lsmHarness, IIndexOpContext ctx) {
+ super(lsmHarness, ctx);
}
@Override
public ITreeIndexCursor createSearchCursor() {
return new LSMRTreeSearchCursor();
}
-
- @Override
- public void search(ITreeIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException,
- TreeIndexException {
- ctx.reset(IndexOp.SEARCH);
- // TODO: fix exception handling throughout LSM tree.
- try {
- lsmRTree.search(cursor, searchPred, ctx, true);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public ITreeIndexCursor createDiskOrderScanCursor() {
- throw new UnsupportedOperationException("DiskOrderScan not supported by LSMRTree.");
- }
-
- @Override
- public void diskOrderScan(ITreeIndexCursor cursor) throws HyracksDataException {
- throw new UnsupportedOperationException("DiskOrderScan not supported by LSMRTree.");
- }
}
-
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
index 487d7d1..c32bf06 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
@@ -15,12 +15,11 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
-import java.util.concurrent.atomic.AtomicInteger;
-
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
public class LSMRTreeCursorInitialState implements ICursorInitialState {
@@ -30,24 +29,22 @@
private ITreeIndexFrameFactory rtreeLeafFrameFactory;
private ITreeIndexFrameFactory btreeLeafFrameFactory;
private MultiComparator btreeCmp;
- private LSMRTree lsmRTree;
private ITreeIndexAccessor[] bTreeAccessors;
private final boolean includeMemRTree;
- private final AtomicInteger searcherRefCount;
+ private final LSMHarness lsmHarness;
public LSMRTreeCursorInitialState(int numberOfTrees, ITreeIndexFrameFactory rtreeLeafFrameFactory,
ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
- MultiComparator btreeCmp, ITreeIndexAccessor[] bTreeAccessors, LSMRTree lsmRTree, boolean includeMemRTree,
- AtomicInteger searcherRefCount) {
+ MultiComparator btreeCmp, ITreeIndexAccessor[] bTreeAccessors, boolean includeMemRTree,
+ LSMHarness lsmHarness) {
this.numberOfTrees = numberOfTrees;
this.rtreeLeafFrameFactory = rtreeLeafFrameFactory;
this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory;
this.btreeLeafFrameFactory = btreeLeafFrameFactory;
this.btreeCmp = btreeCmp;
- this.lsmRTree = lsmRTree;
this.bTreeAccessors = bTreeAccessors;
this.includeMemRTree = includeMemRTree;
- this.searcherRefCount = searcherRefCount;
+ this.lsmHarness = lsmHarness;
}
public int getNumberOfTrees() {
@@ -83,16 +80,12 @@
return bTreeAccessors;
}
- public LSMRTree getLsmRTree() {
- return lsmRTree;
- }
-
public boolean getIncludeMemRTree() {
return includeMemRTree;
}
- public AtomicInteger getSearcherRefCount() {
- return searcherRefCount;
+ public LSMHarness getLSMHarness() {
+ return lsmHarness;
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index f8a4432..5be0e63 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -33,7 +33,8 @@
private BTreeOpContext btreeOpContext;
public final RTree.RTreeAccessor memRTreeAccessor;
public final BTree.BTreeAccessor memBTreeAccessor;
-
+ private IndexOp op;
+
public LSMRTreeOpContext(RTree.RTreeAccessor memRtreeAccessor, IRTreeLeafFrame rtreeLeafFrame,
IRTreeInteriorFrame rtreeInteriorFrame, ITreeIndexMetaDataFrame rtreeMetaFrame, int rTreeHeightHint,
BTree.BTreeAccessor memBtreeAccessor, ITreeIndexFrameFactory btreeLeafFrameFactory,
@@ -53,6 +54,7 @@
} else if (newOp == IndexOp.DELETE) {
btreeOpContext.reset(IndexOp.INSERT);
}
+ this.op = newOp;
}
@Override
@@ -60,4 +62,7 @@
}
+ public IndexOp getIndexOp() {
+ return op;
+ }
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index cdcbb2b..9fd47e9 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -15,8 +15,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
-import java.util.concurrent.atomic.AtomicInteger;
-
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
@@ -28,6 +26,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
@@ -42,11 +41,10 @@
private int currentCursror;
private MultiComparator btreeCmp;
private int numberOfTrees;
- private LSMRTree lsmRTree;
private RangePredicate btreeRangePredicate;
private ITupleReference frameTuple;
private boolean includeMemRTree;
- private AtomicInteger searcherRefCount;
+ private LSMHarness lsmHarness;
public LSMRTreeSearchCursor() {
currentCursror = 0;
@@ -106,10 +104,9 @@
@Override
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
- lsmRTree = lsmInitialState.getLsmRTree();
btreeCmp = lsmInitialState.getBTreeCmp();
includeMemRTree = lsmInitialState.getIncludeMemRTree();
- searcherRefCount = lsmInitialState.getSearcherRefCount();
+ lsmHarness = lsmInitialState.getLSMHarness();
numberOfTrees = lsmInitialState.getNumberOfTrees();
diskBTreeAccessors = lsmInitialState.getBTreeAccessors();
@@ -135,24 +132,15 @@
@Override
public void close() throws HyracksDataException {
- for (int i = 0; i < numberOfTrees; i++) {
+ try {
+ for (int i = 0; i < numberOfTrees; i++) {
rtreeCursors[i].close();
btreeCursors[i].close();
}
rtreeCursors = null;
btreeCursors = null;
-
- // If the in-memory RTree was not included in the search, then we don't
- // need to synchronize with a flush.
- if (includeMemRTree) {
- try {
- lsmRTree.threadExit();
- } catch (TreeIndexException e) {
- throw new HyracksDataException(e);
- }
- } else {
- // Synchronize with ongoing merges.
- searcherRefCount.decrementAndGet();
+ } finally {
+ lsmHarness.closeSearchCursor(includeMemRTree);
}
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index 0113bff..90c41d9 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.storage.am.btree.tests.IOrderedIndexTestContext;
import edu.uci.ics.hyracks.storage.am.btree.tests.OrderedIndexTestDriver;
import edu.uci.ics.hyracks.storage.am.btree.tests.OrderedIndexTestUtils;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTreeIndexAccessor;
@SuppressWarnings("rawtypes")
public abstract class LSMBTreeMergeTestDriver extends OrderedIndexTestDriver {
@@ -57,8 +57,8 @@
}
}
- LSMBTree lsmBTree = (LSMBTree) ctx.getIndex();
- lsmBTree.merge();
+ ILSMTreeIndexAccessor accessor = (ILSMTreeIndexAccessor) ctx.getIndexAccessor();
+ accessor.merge();
OrderedIndexTestUtils.checkPointSearches(ctx);
OrderedIndexTestUtils.checkOrderedScan(ctx);