removed spinlock in ReferenceCountingOperationTracker in favor of condition variable
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1923 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
index 5df083d..60af948 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
@@ -1,5 +1,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
/**
@@ -19,12 +20,12 @@
*
* @param index the index for which the operation entered
*/
- public void threadEnter(ILSMIndex index);
+ public void threadEnter(ILSMIndex index) throws HyracksDataException;
/**
* This method is guaranteed to be called just before an operation is finished on the index.
*
* @param index the index for which the operation exited
*/
- public void threadExit(ILSMIndex index);
+ public void threadExit(ILSMIndex index) throws HyracksDataException;
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 890d107..a8065eb 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -81,7 +81,7 @@
this.ioScheduler = ioScheduler;
}
- private void threadExit() {
+ private void threadExit() throws HyracksDataException {
if (!lsmIndex.getFlushController().getFlushStatus(lsmIndex) && lsmIndex.getInMemoryFreePageManager().isFull()) {
lsmIndex.getFlushController().setFlushStatus(lsmIndex, true);
}
@@ -153,7 +153,8 @@
return diskComponentSnapshot;
}
- public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
+ IndexException {
if (!isMerging.compareAndSet(false, true)) {
throw new LSMMergeInProgressException(
"Merge already in progress. Only one merge process allowed at a time.");
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
deleted file mode 100644
index cd1d394..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-
-public class RefCountingOperationTracker implements ILSMOperationTracker {
-
- private int threadRefCount = 0;
-
- @Override
- public void threadEnter(ILSMIndex index) {
- boolean waitForFlush = true;
- do {
- synchronized (this) {
- // flushFlag may be set to true even though the flush has not occurred yet.
- // If flushFlag is set, then the flush is queued to occur by the last exiting thread.
- // This operation should wait for that flush to occur before proceeding.
- if (!index.getFlushController().getFlushStatus(index)) {
- // Increment the threadRefCount in order to block the possibility of a concurrent flush.
- // The corresponding threadExit() call is in LSMTreeRangeSearchCursor.close()
- threadRefCount++;
-
- // A flush is not pending, so proceed with the operation.
- waitForFlush = false;
- }
- }
- } while (waitForFlush);
- }
-
- @Override
- public void threadExit(final ILSMIndex index) {
- synchronized (this) {
- threadRefCount--;
-
- // Flush will only be handled by last exiting thread.
- if (index.getFlushController().getFlushStatus(index) && threadRefCount == 0) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- index.getIOScheduler().scheduleOperation(
- accessor.createFlushOperation(NoOpIOOperationCallback.INSTANCE));
- }
- }
- }
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerProvider.java
index f0957da..5963a55 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerProvider.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerProvider.java
@@ -10,7 +10,7 @@
@Override
public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
- return new RefCountingOperationTracker();
+ return new ReferenceCountingOperationTracker();
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
new file mode 100644
index 0000000..a9f4d6d
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
@@ -0,0 +1,52 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public class ReferenceCountingOperationTracker implements ILSMOperationTracker {
+
+ private int threadRefCount = 0;
+
+ @Override
+ public void threadEnter(ILSMIndex index) throws HyracksDataException {
+ synchronized (this) {
+ // flushFlag may be set to true even though the flush has not occurred yet.
+ // If flushFlag is set, then the flush is queued to occur by the last exiting thread.
+ // This operation should wait for that flush to occur before proceeding.
+ if (!index.getFlushController().getFlushStatus(index)) {
+ // Increment the threadRefCount in order to block the possibility of a concurrent flush.
+ // The corresponding threadExit() call is in LSMTreeRangeSearchCursor.close()
+ } else {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ threadRefCount++;
+ }
+ }
+
+ @Override
+ public void threadExit(final ILSMIndex index) {
+ synchronized (this) {
+ threadRefCount--;
+
+ // Flush will only be handled by last exiting thread.
+ if (index.getFlushController().getFlushStatus(index) && threadRefCount == 0) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ index.getIOScheduler().scheduleOperation(accessor.createFlushOperation(new ILSMIOOperationCallback() {
+ @Override
+ public void callback() {
+ ReferenceCountingOperationTracker.this.notifyAll();
+ }
+ }));
+ }
+ }
+ }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 76c5967..af1326e 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -40,7 +40,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -94,7 +94,7 @@
this.ioScheduler = ImmediateScheduler.INSTANCE;
lsmtree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, file, bufferCache, fmp,
typeTraits, cmpFactories, new FlushController(), NoMergePolicy.INSTANCE,
- new RefCountingOperationTracker(), ioScheduler);
+ new ReferenceCountingOperationTracker(), ioScheduler);
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
index ab34e4b..1dedc88 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
@@ -40,7 +40,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -89,7 +89,7 @@
this.ioScheduler = ImmediateScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
this.flushController = new FlushController();
- this.opTracker = new RefCountingOperationTracker();
+ this.opTracker = new ReferenceCountingOperationTracker();
}
public LSMBTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -103,7 +103,7 @@
this.ioScheduler = ImmediateScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
this.flushController = new FlushController();
- this.opTracker = new RefCountingOperationTracker();
+ this.opTracker = new ReferenceCountingOperationTracker();
}
public void setUp() throws HyracksException {
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
index c529329..2a9c385 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
@@ -40,7 +40,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -87,7 +87,7 @@
this.ioScheduler = ImmediateScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
this.flushController = new FlushController();
- this.opTracker = new RefCountingOperationTracker();
+ this.opTracker = new ReferenceCountingOperationTracker();
}
public LSMInvertedIndexTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -101,7 +101,7 @@
this.ioScheduler = ImmediateScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
this.flushController = new FlushController();
- this.opTracker = new RefCountingOperationTracker();
+ this.opTracker = new ReferenceCountingOperationTracker();
}
public void setUp() throws HyracksException {
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
index 4de1344..0d3a5c1 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
@@ -41,7 +41,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -88,7 +88,7 @@
this.ioScheduler = ImmediateScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
this.flushController = new FlushController();
- this.opTracker = new RefCountingOperationTracker();
+ this.opTracker = new ReferenceCountingOperationTracker();
}
public LSMRTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -102,7 +102,7 @@
this.ioScheduler = ImmediateScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
this.flushController = new FlushController();
- this.opTracker = new RefCountingOperationTracker();
+ this.opTracker = new ReferenceCountingOperationTracker();
}
public void setUp() throws HyracksException {