Cleaned up lsm-btree concurrency.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1036 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTree.java b/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTree.java
index a7b9cc5..76e1284 100644
--- a/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTree.java
+++ b/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTree.java
@@ -55,7 +55,7 @@
     private int inDiskTreeCounter;
     private final BTreeFactory bTreeFactory;
     private final IFileMapManager fileMapManager;
-    private int threadReferenceCounter;
+    private int threadRefCount;
     private boolean flushFlag;
 
     public LSMTree(IBufferCache memCache, IBufferCache bufferCache, int fieldCount, MultiComparator cmp,
@@ -72,7 +72,7 @@
         this.inDiskTreeInfoList = new LinkedList<InDiskTreeInfo>();
         this.inDiskTreeCounter = 0;
         this.fileMapManager = fileMapManager;
-        this.threadReferenceCounter = 0;
+        this.threadRefCount = 0;
         this.created = false;
         this.flushFlag = false;
 
@@ -114,58 +114,38 @@
         do {
             synchronized (this) {
                 if (!flushFlag) {
-                    threadReferenceCounter++;
+                    threadEnter();
                     waitForFlush = false;
                 }
             }
         } while (waitForFlush == true);
         try {
             ctx.memBtreeAccessor.insert(tuple);
-            decreaseThreadReferenceCounter();
         } catch (BTreeDuplicateKeyException e) {
             ctx.reset(IndexOp.UPDATE);
             // We don't need to deal with a nonexistent key here, because a
             // deleter will actually update the key and it's value, and not
             // delete it from the BTree.
-            ctx.memBtreeAccessor.update(tuple);
-            decreaseThreadReferenceCounter();
-        } 
-        // Check if we've reached or exceeded the maximum number of pages.
-        // Note: It doesn't matter if this inserter or another concurrent
-        // inserter caused the overflow.
-        // The first inserter reaching this code, should set the flush flag.
-        if (memFreePageManager.isFull()) {
-            // Force concurrent inserters to wait, possibly until a flush has completed. 
-            synchronized (this) {
-                // If flushFlag is false it means we are the first inserter to
-                // trigger the flush. If flushFlag is already set to true,
-                // there's no harm in setting it to true again.
-                flushFlag = true;
-                threadReferenceCounter--;
-                if (threadReferenceCounter == 0) {
-                    flushInMemoryBtree();
-                    ctx.reset();
-                    ctx.memBtreeAccessor.insert(tuple);
-                    flushFlag = false;
-                    return;
-                } else if (threadReferenceCounter < 0) {
-                    throw new Error("Thread reference counter is below zero. This indicates a programming error!");
-                }
-            }
+            ctx.memBtreeAccessor.update(tuple);    
         }
+        threadExit();
     }
 
-    public void decreaseThreadReferenceCounter() throws Exception {
+    public void threadEnter() {
+        threadRefCount++;
+    }
+    
+    public void threadExit() throws Exception {
         synchronized (this) {
-            threadReferenceCounter--;
-            if (flushFlag == true) {
-                if (threadReferenceCounter == 0) {
-                    flushInMemoryBtree();
-                    flushFlag = false;
-                    return;
-                } else if (threadReferenceCounter < 0) {
-                    throw new Error("Thread reference counter is below zero. This indicates a programming error!");
-                }
+            threadRefCount--;
+            // Check if we've reached or exceeded the maximum number of pages.
+            if (!flushFlag && memFreePageManager.isFull()) {
+                flushFlag = true;
+            }
+            // Flush will only be handled by last exiting thread.
+            if (flushFlag && threadRefCount == 0) {
+                flushInMemoryBtree();
+                flushFlag = false;
             }
         }
     }
@@ -263,7 +243,7 @@
         while (continuePerformOp == false) {
             synchronized (this) {
                 if (!flushFlag) {
-                    threadReferenceCounter++;
+                    threadRefCount++;
                     continuePerformOp = true;
                 }
             }
@@ -312,7 +292,7 @@
         while (continuePerformOp == false) {
             synchronized (this) {
                 if (!flushFlag) {
-                    threadReferenceCounter++;
+                    threadRefCount++;
                     continuePerformOp = true;
                 }
             }
diff --git a/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTreeRangeSearchCursor.java b/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTreeRangeSearchCursor.java
index 079dada..03253c3 100644
--- a/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsmtree-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/impls/LSMTreeRangeSearchCursor.java
@@ -91,7 +91,7 @@
 
     @Override
     public void close() throws Exception {
-        lsm.decreaseThreadReferenceCounter();
+        lsm.threadExit();
         outputPriorityQueue.clear();
         for (int i = 0; i < numberOfTrees; i++) {
             rangeCursors[i].close();