Cleaned up lsm-btree concurrency.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1036 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTree.java b/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTree.java
index a7b9cc5..76e1284 100644
--- a/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTree.java
+++ b/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTree.java
@@ -55,7 +55,7 @@
private int inDiskTreeCounter;
private final BTreeFactory bTreeFactory;
private final IFileMapManager fileMapManager;
- private int threadReferenceCounter;
+ private int threadRefCount;
private boolean flushFlag;
public LSMTree(IBufferCache memCache, IBufferCache bufferCache, int fieldCount, MultiComparator cmp,
@@ -72,7 +72,7 @@
this.inDiskTreeInfoList = new LinkedList<InDiskTreeInfo>();
this.inDiskTreeCounter = 0;
this.fileMapManager = fileMapManager;
- this.threadReferenceCounter = 0;
+ this.threadRefCount = 0;
this.created = false;
this.flushFlag = false;
@@ -114,58 +114,38 @@
do {
synchronized (this) {
if (!flushFlag) {
- threadReferenceCounter++;
+ threadEnter();
waitForFlush = false;
}
}
} while (waitForFlush == true);
try {
ctx.memBtreeAccessor.insert(tuple);
- decreaseThreadReferenceCounter();
} catch (BTreeDuplicateKeyException e) {
ctx.reset(IndexOp.UPDATE);
// We don't need to deal with a nonexistent key here, because a
// deleter will actually update the key and it's value, and not
// delete it from the BTree.
- ctx.memBtreeAccessor.update(tuple);
- decreaseThreadReferenceCounter();
- }
- // Check if we've reached or exceeded the maximum number of pages.
- // Note: It doesn't matter if this inserter or another concurrent
- // inserter caused the overflow.
- // The first inserter reaching this code, should set the flush flag.
- if (memFreePageManager.isFull()) {
- // Force concurrent inserters to wait, possibly until a flush has completed.
- synchronized (this) {
- // If flushFlag is false it means we are the first inserter to
- // trigger the flush. If flushFlag is already set to true,
- // there's no harm in setting it to true again.
- flushFlag = true;
- threadReferenceCounter--;
- if (threadReferenceCounter == 0) {
- flushInMemoryBtree();
- ctx.reset();
- ctx.memBtreeAccessor.insert(tuple);
- flushFlag = false;
- return;
- } else if (threadReferenceCounter < 0) {
- throw new Error("Thread reference counter is below zero. This indicates a programming error!");
- }
- }
+ ctx.memBtreeAccessor.update(tuple);
}
+ threadExit();
}
- public void decreaseThreadReferenceCounter() throws Exception {
+ public void threadEnter() {
+ threadRefCount++;
+ }
+
+ public void threadExit() throws Exception {
synchronized (this) {
- threadReferenceCounter--;
- if (flushFlag == true) {
- if (threadReferenceCounter == 0) {
- flushInMemoryBtree();
- flushFlag = false;
- return;
- } else if (threadReferenceCounter < 0) {
- throw new Error("Thread reference counter is below zero. This indicates a programming error!");
- }
+ threadRefCount--;
+ // Check if we've reached or exceeded the maximum number of pages.
+ if (!flushFlag && memFreePageManager.isFull()) {
+ flushFlag = true;
+ }
+ // Flush will only be handled by last exiting thread.
+ if (flushFlag && threadRefCount == 0) {
+ flushInMemoryBtree();
+ flushFlag = false;
}
}
}
@@ -263,7 +243,7 @@
while (continuePerformOp == false) {
synchronized (this) {
if (!flushFlag) {
- threadReferenceCounter++;
+ threadRefCount++;
continuePerformOp = true;
}
}
@@ -312,7 +292,7 @@
while (continuePerformOp == false) {
synchronized (this) {
if (!flushFlag) {
- threadReferenceCounter++;
+ threadRefCount++;
continuePerformOp = true;
}
}
diff --git a/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTreeRangeSearchCursor.java b/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTreeRangeSearchCursor.java
index 079dada..03253c3 100644
--- a/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTreeRangeSearchCursor.java
@@ -91,7 +91,7 @@
@Override
public void close() throws Exception {
- lsm.decreaseThreadReferenceCounter();
+ lsm.threadExit();
outputPriorityQueue.clear();
for (int i = 0; i < numberOfTrees; i++) {
rangeCursors[i].close();