removed spinlock in ReferenceCountingOperationTracker in favor of condition variable

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1923 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
index 5df083d..60af948 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
@@ -1,5 +1,6 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 
 /**
@@ -19,12 +20,12 @@
      * 
      * @param index the index for which the operation entered
      */
-    public void threadEnter(ILSMIndex index);
+    public void threadEnter(ILSMIndex index) throws HyracksDataException;
 
     /**
      * This method is guaranteed to be called just before an operation is finished on the index.
      * 
      * @param index the index for which the operation exited
      */
-    public void threadExit(ILSMIndex index);
+    public void threadExit(ILSMIndex index) throws HyracksDataException;
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 890d107..a8065eb 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -81,7 +81,7 @@
         this.ioScheduler = ioScheduler;
     }
 
-    private void threadExit() {
+    private void threadExit() throws HyracksDataException {
         if (!lsmIndex.getFlushController().getFlushStatus(lsmIndex) && lsmIndex.getInMemoryFreePageManager().isFull()) {
             lsmIndex.getFlushController().setFlushStatus(lsmIndex, true);
         }
@@ -153,7 +153,8 @@
         return diskComponentSnapshot;
     }
 
-    public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+    public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
+            IndexException {
         if (!isMerging.compareAndSet(false, true)) {
             throw new LSMMergeInProgressException(
                     "Merge already in progress. Only one merge process allowed at a time.");
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
deleted file mode 100644
index cd1d394..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-
-public class RefCountingOperationTracker implements ILSMOperationTracker {
-
-    private int threadRefCount = 0;
-
-    @Override
-    public void threadEnter(ILSMIndex index) {
-        boolean waitForFlush = true;
-        do {
-            synchronized (this) {
-                // flushFlag may be set to true even though the flush has not occurred yet.
-                // If flushFlag is set, then the flush is queued to occur by the last exiting thread.
-                // This operation should wait for that flush to occur before proceeding.
-                if (!index.getFlushController().getFlushStatus(index)) {
-                    // Increment the threadRefCount in order to block the possibility of a concurrent flush.
-                    // The corresponding threadExit() call is in LSMTreeRangeSearchCursor.close()
-                    threadRefCount++;
-
-                    // A flush is not pending, so proceed with the operation.
-                    waitForFlush = false;
-                }
-            }
-        } while (waitForFlush);
-    }
-
-    @Override
-    public void threadExit(final ILSMIndex index) {
-        synchronized (this) {
-            threadRefCount--;
-
-            // Flush will only be handled by last exiting thread.
-            if (index.getFlushController().getFlushStatus(index) && threadRefCount == 0) {
-                ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
-                        NoOpOperationCallback.INSTANCE);
-                index.getIOScheduler().scheduleOperation(
-                        accessor.createFlushOperation(NoOpIOOperationCallback.INSTANCE));
-            }
-        }
-    }
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerProvider.java
index f0957da..5963a55 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerProvider.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerProvider.java
@@ -10,7 +10,7 @@
 
     @Override
     public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
-        return new RefCountingOperationTracker();
+        return new ReferenceCountingOperationTracker();
     }
 
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
new file mode 100644
index 0000000..a9f4d6d
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
@@ -0,0 +1,52 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public class ReferenceCountingOperationTracker implements ILSMOperationTracker {
+
+    private int threadRefCount = 0;
+
+    @Override
+    public void threadEnter(ILSMIndex index) throws HyracksDataException {
+        synchronized (this) {
+            // flushFlag may be set to true even though the flush has not occurred yet.
+            // If flushFlag is set, then the flush is queued to occur by the last exiting thread.
+            // This operation should wait for that flush to occur before proceeding.
+            if (!index.getFlushController().getFlushStatus(index)) {
+                // Increment the threadRefCount in order to block the possibility of a concurrent flush.
+                // The corresponding threadExit() call is in LSMTreeRangeSearchCursor.close()
+            } else {
+                try {
+                    this.wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            threadRefCount++;
+        }
+    }
+
+    @Override
+    public void threadExit(final ILSMIndex index) {
+        synchronized (this) {
+            threadRefCount--;
+
+            // Flush will only be handled by last exiting thread.
+            if (index.getFlushController().getFlushStatus(index) && threadRefCount == 0) {
+                ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                        NoOpOperationCallback.INSTANCE);
+                index.getIOScheduler().scheduleOperation(accessor.createFlushOperation(new ILSMIOOperationCallback() {
+                    @Override
+                    public void callback() {
+                        ReferenceCountingOperationTracker.this.notifyAll();
+                    }
+                }));
+            }
+        }
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 76c5967..af1326e 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -40,7 +40,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -94,7 +94,7 @@
         this.ioScheduler = ImmediateScheduler.INSTANCE;
         lsmtree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, file, bufferCache, fmp,
                 typeTraits, cmpFactories, new FlushController(), NoMergePolicy.INSTANCE,
-                new RefCountingOperationTracker(), ioScheduler);
+                new ReferenceCountingOperationTracker(), ioScheduler);
     }
 
     @Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
index ab34e4b..1dedc88 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
@@ -40,7 +40,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -89,7 +89,7 @@
         this.ioScheduler = ImmediateScheduler.INSTANCE;
         this.mergePolicy = NoMergePolicy.INSTANCE;
         this.flushController = new FlushController();
-        this.opTracker = new RefCountingOperationTracker();
+        this.opTracker = new ReferenceCountingOperationTracker();
     }
 
     public LSMBTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -103,7 +103,7 @@
         this.ioScheduler = ImmediateScheduler.INSTANCE;
         this.mergePolicy = NoMergePolicy.INSTANCE;
         this.flushController = new FlushController();
-        this.opTracker = new RefCountingOperationTracker();
+        this.opTracker = new ReferenceCountingOperationTracker();
     }
 
     public void setUp() throws HyracksException {
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
index c529329..2a9c385 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
@@ -40,7 +40,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -87,7 +87,7 @@
         this.ioScheduler = ImmediateScheduler.INSTANCE;
         this.mergePolicy = NoMergePolicy.INSTANCE;
         this.flushController = new FlushController();
-        this.opTracker = new RefCountingOperationTracker();
+        this.opTracker = new ReferenceCountingOperationTracker();
     }
 
     public LSMInvertedIndexTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -101,7 +101,7 @@
         this.ioScheduler = ImmediateScheduler.INSTANCE;
         this.mergePolicy = NoMergePolicy.INSTANCE;
         this.flushController = new FlushController();
-        this.opTracker = new RefCountingOperationTracker();
+        this.opTracker = new ReferenceCountingOperationTracker();
     }
 
     public void setUp() throws HyracksException {
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
index 4de1344..0d3a5c1 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
@@ -41,7 +41,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -88,7 +88,7 @@
         this.ioScheduler = ImmediateScheduler.INSTANCE;
         this.mergePolicy = NoMergePolicy.INSTANCE;
         this.flushController = new FlushController();
-        this.opTracker = new RefCountingOperationTracker();
+        this.opTracker = new ReferenceCountingOperationTracker();
     }
 
     public LSMRTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -102,7 +102,7 @@
         this.ioScheduler = ImmediateScheduler.INSTANCE;
         this.mergePolicy = NoMergePolicy.INSTANCE;
         this.flushController = new FlushController();
-        this.opTracker = new RefCountingOperationTracker();
+        this.opTracker = new ReferenceCountingOperationTracker();
     }
 
     public void setUp() throws HyracksException {