No more spining when cannot enter an lsm component.
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
index 6307012..e814540 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
@@ -25,9 +25,7 @@
@Override
public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) {
- if (state == ComponentState.INACTIVE) {
- return false;
- }
+ assert state != ComponentState.INACTIVE;
switch (opType) {
case FORCE_MODIFICATION:
@@ -37,6 +35,9 @@
break;
case MERGE:
if (state == ComponentState.READABLE_MERGING) {
+ // This should never happen unless there are two concurrent merges that were scheduled
+ // concurrently and they have interleaving components to be merged.
+ // This should be handled properly by the merge policy, but we guard against that here anyway.
return false;
}
state = ComponentState.READABLE_MERGING;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
index 33ad39b..7b96de0 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
@@ -86,6 +86,9 @@
}
break;
case FLUSH:
+ // There should not be two flush requests of the same component at the same time.
+ assert state != ComponentState.READABLE_UNWRITABLE_FLUSHING
+ && state != ComponentState.UNREADABLE_UNWRITABLE;
if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE) {
assert writerCount == 0;
state = ComponentState.READABLE_UNWRITABLE_FLUSHING;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index b2ac681..1cc5143 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -50,18 +50,22 @@
private boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean tryOperation)
throws HyracksDataException {
- boolean entranceSuccessful = false;
-
- while (!entranceSuccessful) {
- synchronized (opTracker) {
+ synchronized (opTracker) {
+ while (true) {
lsmIndex.getOperationalComponents(ctx);
- entranceSuccessful = enterComponents(ctx, opType);
- if (tryOperation && !entranceSuccessful) {
+ boolean entranceSuccessful = enterComponents(ctx, opType);
+ if (entranceSuccessful) {
+ return true;
+ } else if (tryOperation && !entranceSuccessful) {
return false;
}
+ try {
+ opTracker.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
}
}
- return true;
}
private boolean enterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException {
@@ -92,19 +96,18 @@
return false;
}
}
- opTracker.beforeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(), ctx.getModificationCallback());
-
// Check if there is any action that is needed to be taken based on the operation type
switch (opType) {
case FLUSH:
// Changing the flush status should *always* precede changing the mutable component.
lsmIndex.changeFlushStatusForCurrentMutableCompoent(false);
lsmIndex.changeMutableComponent();
+ opTracker.notifyAll();
break;
default:
break;
}
-
+ opTracker.beforeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(), ctx.getModificationCallback());
return true;
}
@@ -117,7 +120,6 @@
for (ILSMComponent c : ctx.getComponentHolder()) {
boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY ? true : false;
c.threadExit(opType, failedOperation, isMutableComponent);
-
if (c.getType() == LSMComponentType.MEMORY) {
switch (c.getState()) {
case READABLE_UNWRITABLE:
@@ -128,6 +130,7 @@
break;
case INACTIVE:
((AbstractMemoryLSMComponent) c).reset();
+ opTracker.notifyAll();
break;
default:
break;
@@ -143,7 +146,6 @@
}
i++;
}
-
// Then, perform any action that is needed to be taken based on the operation type.
switch (opType) {
case FLUSH:
@@ -162,7 +164,6 @@
default:
break;
}
-
} finally {
opTracker.afterOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
ctx.getModificationCallback());
@@ -269,6 +270,7 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
LSMOperationType opType = LSMOperationType.MERGE;
+ // Merge should always be a try operation, because it should never fail to enter the components unless the merge policy is erroneous.
if (!getAndEnterComponents(ctx, opType, true)) {
return;
}