Fixed a race between mergers and searchers in LSM indexes. Completed multi-threading tests for LSM B-Tree.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1137 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index 5fce2a1..c1d43ab 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -100,7 +100,7 @@
@Override
public void close() {
- fileId = -1;
+ fileId = -666;
}
private void diskOrderScan(ITreeIndexCursor icursor, BTreeOpContext ctx) throws HyracksDataException {
@@ -494,7 +494,14 @@
}
private void performOp(int pageId, ICachedPage parent, boolean parentIsReadLatched, BTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
- ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+ ICachedPage node = null;
+ try {
+ node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("OH NO!");
+ System.out.println("OH NO!");
+ }
ctx.interiorFrame.setPage(node);
// this check performs an unprotected read in the page
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java
index a3027c7..4395c01 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java
@@ -69,7 +69,7 @@
workers[j].start();
}
// Join worker threads.
- for (int j = 0; j < numThreads; j++) {
+ for (int j = 0; j < numThreads; j++) {
workers[j].join();
}
long end = System.currentTimeMillis();
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index fa25083..ae7eb51 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -22,6 +22,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -141,7 +142,7 @@
}
@Override
- public void close() throws HyracksDataException {
+ public void close() throws HyracksDataException {
for (Object o : diskBTrees) {
BTree btree = (BTree) o;
diskBufferCache.closeFile(btree.getFileId());
@@ -152,7 +153,7 @@
}
@Override
- public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException, TreeIndexException {
+ public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException, TreeIndexException {
LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
// TODO: This will become much simpler once the BTree supports a true upsert operation.
try {
@@ -163,14 +164,15 @@
// TODO: The methods below are very inefficient, we'd rather like
// to flip the antimatter bit one single BTree traversal.
if (ctx.getIndexOp() == IndexOp.DELETE) {
- deleteExistingKey(tuple, ctx);
+ return deleteExistingKey(tuple, ctx);
} else {
- insertOrUpdateExistingKey(tuple, ctx);
+ return insertOrUpdateExistingKey(tuple, ctx);
}
}
+ return true;
}
- private void deleteExistingKey(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
+ private boolean deleteExistingKey(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
// We assume that tuple given by the user for deletion only contains the
// key fields, but not any non-key fields.
// Therefore, to set the delete bit in the tuple that already exist in
@@ -195,30 +197,29 @@
// There is a remote chance of livelocks due to this behavior.
if (tupleCopy == null) {
ctx.reset(IndexOp.DELETE);
- lsmHarness.insertUpdateOrDelete(tuple, ctx);
- return;
+ return false;
}
- memBTreeUpdate(tupleCopy, ctx);
+ return memBTreeUpdate(tupleCopy, ctx);
} else {
// Since the existing tuple could be a matter tuple, we must delete it and re-insert.
- memBTreeDeleteAndReinsert(tuple, ctx);
+ return memBTreeDeleteAndReinsert(tuple, ctx);
}
}
- private void insertOrUpdateExistingKey(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
+ private boolean insertOrUpdateExistingKey(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
TreeIndexException {
// If all fields are keys, and the key we are trying to insert/update
// already exists, then we are already done.
// Otherwise, we must update the non-key fields.
if (cmpFactories.length != memBTree.getFieldCount()) {
- memBTreeUpdate(tuple, ctx);
+ return memBTreeUpdate(tuple, ctx);
} else {
// Since the existing tuple could be an antimatter tuple, we must delete it and re-insert.
- memBTreeDeleteAndReinsert(tuple, ctx);
+ return memBTreeDeleteAndReinsert(tuple, ctx);
}
}
- private void memBTreeDeleteAndReinsert(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
+ private boolean memBTreeDeleteAndReinsert(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
TreeIndexException {
// All fields are key fields, therefore a true BTree update is not
// allowed.
@@ -236,10 +237,10 @@
}
// Restart performOp to insert the tuple.
ctx.reset(originalOp);
- lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ return false;
}
- private void memBTreeUpdate(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
+ private boolean memBTreeUpdate(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException,
TreeIndexException {
IndexOp originalOp = ctx.getIndexOp();
try {
@@ -250,8 +251,9 @@
// Simply restart the operation. There is a remote chance of
// livelocks due to this behavior.
ctx.reset(originalOp);
- lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ return false;
}
+ return true;
}
@Override
@@ -322,13 +324,13 @@
return diskBTree;
}
- public void search(ITreeIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx, boolean includeMemComponent) throws HyracksDataException, TreeIndexException {
+ public void search(ITreeIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx, boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, TreeIndexException {
LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
LSMBTreeRangeSearchCursor lsmTreeCursor = (LSMBTreeRangeSearchCursor) cursor;
int numDiskBTrees = diskComponents.size();
int numBTrees = (includeMemComponent) ? numDiskBTrees + 1 : numDiskBTrees;
LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees,
- insertLeafFrameFactory, ctx.cmp, includeMemComponent, lsmHarness);
+ insertLeafFrameFactory, ctx.cmp, includeMemComponent, searcherRefCount, lsmHarness);
lsmTreeCursor.open(initialState, pred);
int cursorIx;
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
index 22b85dd..e866326 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+import java.util.concurrent.atomic.AtomicInteger;
+
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -26,15 +28,17 @@
private final int numBTrees;
private final ITreeIndexFrameFactory leafFrameFactory;
private final MultiComparator cmp;
- private final boolean includeMemBTree;
+ private final boolean includeMemComponent;
+ private final AtomicInteger searcherfRefCount;
private final LSMHarness lsmHarness;
public LSMBTreeCursorInitialState(int numBTrees, ITreeIndexFrameFactory leafFrameFactory, MultiComparator cmp,
- boolean includeMemBTree, LSMHarness lsmHarness) {
+ boolean includeMemComponent, AtomicInteger searcherfRefCount, LSMHarness lsmHarness) {
this.numBTrees = numBTrees;
this.leafFrameFactory = leafFrameFactory;
this.cmp = cmp;
- this.includeMemBTree = includeMemBTree;
+ this.includeMemComponent = includeMemComponent;
+ this.searcherfRefCount = searcherfRefCount;
this.lsmHarness = lsmHarness;
}
@@ -59,8 +63,12 @@
public void setPage(ICachedPage page) {
}
- public boolean getIncludeMemBTree() {
- return includeMemBTree;
+ public AtomicInteger getSearcherRefCount() {
+ return searcherfRefCount;
+ }
+
+ public boolean getIncludeMemComponent() {
+ return includeMemComponent;
}
public LSMHarness getLSMHarness() {
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 6f45287..24ba4eb 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -17,6 +17,7 @@
import java.util.Comparator;
import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -39,7 +40,8 @@
private PriorityQueueElement outputElement;
private PriorityQueueElement reusedElement;
private boolean needPush;
- private boolean includeMemBTree;
+ private boolean includeMemComponent;
+ private AtomicInteger searcherfRefCount;
private LSMHarness lsmHarness;
public LSMBTreeRangeSearchCursor() {
@@ -97,7 +99,8 @@
IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getLeafFrameFactory().createFrame();
rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
}
- includeMemBTree = lsmInitialState.getIncludeMemBTree();
+ includeMemComponent = lsmInitialState.getIncludeMemComponent();
+ searcherfRefCount = lsmInitialState.getSearcherRefCount();
lsmHarness = lsmInitialState.getLSMHarness();
setPriorityQueueComparator();
}
@@ -123,7 +126,7 @@
}
rangeCursors = null;
} finally {
- lsmHarness.closeSearchCursor(includeMemBTree);
+ lsmHarness.closeSearchCursor(searcherfRefCount, includeMemComponent);
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
index 69068e2..e272ec8 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
@@ -16,6 +16,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -36,11 +37,11 @@
*
*/
public interface ILSMTree extends ITreeIndex {
- public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
+ public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
TreeIndexException;
public void search(ITreeIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred,
- IIndexOpContext ictx, boolean includeMemComponent) throws HyracksDataException, TreeIndexException;
+ IIndexOpContext ictx, boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, TreeIndexException;
public Object merge(List<Object> mergedComponents) throws HyracksDataException, TreeIndexException;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index aab1137..47588d0 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -41,7 +41,7 @@
protected final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
protected static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
- private ILSMTree lsmTree;
+ private ILSMTree lsmTree;
// All accesses to the LSM-Tree's on-disk components are synchronized on diskComponentsSync.
private Object diskComponentsSync = new Object();
@@ -90,7 +90,7 @@
do {
// Wait for ongoing flush to complete.
synchronized (this) {
- if (!flushFlag) {
+ if (!flushFlag) {
// Increments threadRefCount, to force a flush to wait for this operation to finish.
// (a flush can only begin once threadRefCount == 0).
threadEnter();
@@ -99,8 +99,12 @@
}
}
} while (waitForFlush);
+
+ boolean operationComplete = true;
try {
- lsmTree.insertUpdateOrDelete(tuple, ctx);
+ do {
+ operationComplete = lsmTree.insertUpdateOrDelete(tuple, ctx);
+ } while (!operationComplete);
} finally {
threadExit();
}
@@ -144,15 +148,12 @@
List<Object> diskComponentSnapshot = new ArrayList<Object>();
AtomicInteger localSearcherRefCount = null;
synchronized (diskComponentsSync) {
- diskComponentSnapshot.addAll(lsmTree.getDiskComponents());
- // Only remember the search ref count when performing a merge (i.e., includeMemComponent is false).
- if (!includeMemComponent) {
- localSearcherRefCount = searcherRefCount;
- localSearcherRefCount.incrementAndGet();
- }
+ diskComponentSnapshot.addAll(lsmTree.getDiskComponents());
+ localSearcherRefCount = searcherRefCount;
+ localSearcherRefCount.incrementAndGet();
}
- lsmTree.search(cursor, diskComponentSnapshot, pred, ctx, includeMemComponent);
+ lsmTree.search(cursor, diskComponentSnapshot, pred, ctx, includeMemComponent, localSearcherRefCount);
return diskComponentSnapshot;
}
@@ -176,8 +177,8 @@
// Also, swap the searchRefCount.
synchronized (diskComponentsSync) {
lsmTree.addMergedComponent(newComponent, mergedComponents);
- // Swap the searcher ref count reference, and reset it to zero.
- if (searcherRefCount == searcherRefCountA) {
+ // Swap the searcher ref count reference, and reset it to zero.
+ if (searcherRefCount == searcherRefCountA) {
searcherRefCount = searcherRefCountB;
} else {
searcherRefCount = searcherRefCountA;
@@ -187,7 +188,7 @@
// Wait for all searchers that are still accessing the old on-disk
// Trees, then perform the final cleanup of the old Trees.
- while (localSearcherRefCount.get() != 0) {
+ while (localSearcherRefCount.get() > 0) {
try {
Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
} catch (InterruptedException e) {
@@ -203,11 +204,7 @@
isMerging.set(false);
}
- public AtomicInteger getSearcherRefCount() {
- return searcherRefCount;
- }
-
- public void closeSearchCursor(boolean includeMemComponent) throws HyracksDataException {
+ public void closeSearchCursor(AtomicInteger searcherRefCount, boolean includeMemComponent) throws HyracksDataException {
// If the in-memory Tree was not included in the search, then we don't
// need to synchronize with a flush.
if (includeMemComponent) {
@@ -216,10 +213,9 @@
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
- } else {
- // Synchronize with ongoing merges.
- searcherRefCount.decrementAndGet();
}
+ // Synchronize with ongoing merges.
+ searcherRefCount.decrementAndGet();
}
public void addBulkLoadedComponent(Object index) {
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 99f082f..17fcc3d 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -22,6 +22,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -289,7 +290,7 @@
return 0;
}
- public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
+ public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
TreeIndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
if (ctx.getIndexOp() == IndexOp.INSERT) {
@@ -326,10 +327,11 @@
// that all the corresponding insert tuples are deleted
}
}
+ return true;
}
public void search(ITreeIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred,
- IIndexOpContext ictx, boolean includeMemComponent) throws HyracksDataException, TreeIndexException {
+ IIndexOpContext ictx, boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, TreeIndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
int numDiskTrees = diskComponents.size();
int numTrees = (includeMemComponent) ? numDiskTrees + 1 : numDiskTrees;
@@ -351,7 +353,7 @@
LSMRTreeSearchCursor lsmRTreeCursor = (LSMRTreeSearchCursor) cursor;
LSMRTreeCursorInitialState initialState = new LSMRTreeCursorInitialState(numTrees, rtreeLeafFrameFactory,
rtreeInteriorFrameFactory, btreeLeafFrameFactory, ctx.getBTreeMultiComparator(), bTreeAccessors,
- includeMemComponent, lsmHarness);
+ searcherRefCount, includeMemComponent, lsmHarness);
lsmRTreeCursor.open(initialState, pred);
int cursorIx;
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
index c32bf06..f4a00a0 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+import java.util.concurrent.atomic.AtomicInteger;
+
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
@@ -30,12 +32,13 @@
private ITreeIndexFrameFactory btreeLeafFrameFactory;
private MultiComparator btreeCmp;
private ITreeIndexAccessor[] bTreeAccessors;
+ private AtomicInteger searcherRefCount;
private final boolean includeMemRTree;
private final LSMHarness lsmHarness;
public LSMRTreeCursorInitialState(int numberOfTrees, ITreeIndexFrameFactory rtreeLeafFrameFactory,
ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
- MultiComparator btreeCmp, ITreeIndexAccessor[] bTreeAccessors, boolean includeMemRTree,
+ MultiComparator btreeCmp, ITreeIndexAccessor[] bTreeAccessors, AtomicInteger searcherRefCount, boolean includeMemRTree,
LSMHarness lsmHarness) {
this.numberOfTrees = numberOfTrees;
this.rtreeLeafFrameFactory = rtreeLeafFrameFactory;
@@ -43,6 +46,7 @@
this.btreeLeafFrameFactory = btreeLeafFrameFactory;
this.btreeCmp = btreeCmp;
this.bTreeAccessors = bTreeAccessors;
+ this.searcherRefCount = searcherRefCount;
this.includeMemRTree = includeMemRTree;
this.lsmHarness = lsmHarness;
}
@@ -83,6 +87,10 @@
public boolean getIncludeMemRTree() {
return includeMemRTree;
}
+
+ public AtomicInteger getSearcherRefCount() {
+ return searcherRefCount;
+ }
public LSMHarness getLSMHarness() {
return lsmHarness;
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index c8d584c..b277694 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+import java.util.concurrent.atomic.AtomicInteger;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
@@ -43,6 +45,7 @@
private int numberOfTrees;
private RangePredicate btreeRangePredicate;
private ITupleReference frameTuple;
+ private AtomicInteger searcherRefCount;
private boolean includeMemRTree;
private LSMHarness lsmHarness;
private boolean foundNext = false;
@@ -107,6 +110,7 @@
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
btreeCmp = lsmInitialState.getBTreeCmp();
+ searcherRefCount = lsmInitialState.getSearcherRefCount();
includeMemRTree = lsmInitialState.getIncludeMemRTree();
lsmHarness = lsmInitialState.getLSMHarness();
numberOfTrees = lsmInitialState.getNumberOfTrees();
@@ -142,7 +146,7 @@
rtreeCursors = null;
btreeCursors = null;
} finally {
- lsmHarness.closeSearchCursor(includeMemRTree);
+ lsmHarness.closeSearchCursor(searcherRefCount, includeMemRTree);
}
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
index e3f57e0..1267510 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
@@ -64,34 +64,31 @@
// Insert only workload.
TestOperation[] insertOnlyOps = new TestOperation[] { TestOperation.INSERT };
- //workloadConfs.add(new TestWorkloadConf(insertOnlyOps, getUniformOpProbs(insertOnlyOps)));
+ workloadConfs.add(new TestWorkloadConf(insertOnlyOps, getUniformOpProbs(insertOnlyOps)));
// Insert and merge workload.
TestOperation[] insertMergeOps = new TestOperation[] { TestOperation.INSERT, TestOperation.MERGE };
- //workloadConfs.add(new TestWorkloadConf(insertMergeOps, getUniformOpProbs(insertMergeOps)));
+ workloadConfs.add(new TestWorkloadConf(insertMergeOps, getUniformOpProbs(insertMergeOps)));
// Inserts mixed with point searches and scans.
TestOperation[] insertSearchOnlyOps = new TestOperation[] { TestOperation.INSERT, TestOperation.POINT_SEARCH, TestOperation.ORDERED_SCAN };
- //workloadConfs.add(new TestWorkloadConf(insertSearchOnlyOps, getUniformOpProbs(insertSearchOnlyOps)));
+ workloadConfs.add(new TestWorkloadConf(insertSearchOnlyOps, getUniformOpProbs(insertSearchOnlyOps)));
// Inserts, updates, and deletes.
TestOperation[] insertDeleteUpdateOps = new TestOperation[] { TestOperation.INSERT, TestOperation.DELETE, TestOperation.UPDATE };
- //workloadConfs.add(new TestWorkloadConf(insertDeleteUpdateOps, getUniformOpProbs(insertDeleteUpdateOps)));
+ workloadConfs.add(new TestWorkloadConf(insertDeleteUpdateOps, getUniformOpProbs(insertDeleteUpdateOps)));
// Inserts, updates, deletes and merges.
TestOperation[] insertDeleteUpdateMergeOps = new TestOperation[] { TestOperation.INSERT, TestOperation.DELETE, TestOperation.UPDATE, TestOperation.MERGE };
- //workloadConfs.add(new TestWorkloadConf(insertDeleteUpdateMergeOps, getUniformOpProbs(insertDeleteUpdateMergeOps)));
+ workloadConfs.add(new TestWorkloadConf(insertDeleteUpdateMergeOps, getUniformOpProbs(insertDeleteUpdateMergeOps)));
// All operations except merge.
TestOperation[] allNoMergeOps = new TestOperation[] { TestOperation.INSERT, TestOperation.DELETE, TestOperation.UPDATE, TestOperation.POINT_SEARCH, TestOperation.ORDERED_SCAN };
- //workloadConfs.add(new TestWorkloadConf(allNoMergeOps, getUniformOpProbs(allNoMergeOps)));
+ workloadConfs.add(new TestWorkloadConf(allNoMergeOps, getUniformOpProbs(allNoMergeOps)));
- // All operations merge.
+ // All operations.
TestOperation[] allOps = new TestOperation[] { TestOperation.INSERT, TestOperation.DELETE, TestOperation.UPDATE, TestOperation.POINT_SEARCH, TestOperation.ORDERED_SCAN, TestOperation.MERGE };
- //workloadConfs.add(new TestWorkloadConf(allOps, getUniformOpProbs(allOps)));
-
- TestOperation[] debugOps = new TestOperation[] { TestOperation.INSERT, TestOperation.POINT_SEARCH, TestOperation.ORDERED_SCAN, TestOperation.MERGE };
- workloadConfs.add(new TestWorkloadConf(debugOps, getUniformOpProbs(debugOps)));
+ workloadConfs.add(new TestWorkloadConf(allOps, getUniformOpProbs(allOps)));
return workloadConfs;
}