[ASTERIXDB-2118][STO] Ensure flush ordering of memory components
- user model changes: no
- storage format changes: no
- interface changes: no
- Fix the bug of AsynchronousScheduler by waking up the next
flush only when the current operation is a FLUSH operation
Change-Id: I7de4a1625fdd3faaa07f65be2ebc714ec7564b29
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2038
Reviewed-by: Ian Maxon <imaxon@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index 5f6766f..438bb0b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -60,19 +60,21 @@
super.afterExecute(r, t);
LSMIOOperationTask<Boolean> task = (LSMIOOperationTask<Boolean>) r;
ILSMIOOperation executedOp = task.getOperation();
- String id = executedOp.getIndexIdentifier();
- synchronized (this) {
- runningFlushOperations.remove(id);
- if (waitingFlushOperations.containsKey(id)) {
- try {
- ILSMIOOperation op = waitingFlushOperations.get(id).poll();
- if (op != null) {
- scheduleOperation(op);
- } else {
- waitingFlushOperations.remove(id);
+ if (executedOp.getIOOpertionType() == LSMIOOpertionType.FLUSH) {
+ String id = executedOp.getIndexIdentifier();
+ synchronized (this) {
+ runningFlushOperations.remove(id);
+ if (waitingFlushOperations.containsKey(id)) {
+ try {
+ ILSMIOOperation op = waitingFlushOperations.get(id).poll();
+ if (op != null) {
+ scheduleOperation(op);
+ } else {
+ waitingFlushOperations.remove(id);
+ }
+ } catch (HyracksDataException e) {
+ t = e.getCause();
}
- } catch (HyracksDataException e) {
- t = e.getCause();
}
}
}
@@ -84,7 +86,7 @@
public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
if (operation.getIOOpertionType() == LSMIOOpertionType.MERGE) {
executor.submit(operation);
- } else {
+ } else if (operation.getIOOpertionType() == LSMIOOpertionType.FLUSH) {
String id = operation.getIndexIdentifier();
synchronized (executor) {
if (runningFlushOperations.containsKey(id)) {
@@ -100,6 +102,10 @@
executor.submit(operation);
}
}
+ } else {
+ // this should never happen
+ // just guard here to avoid silient failures in case of future extensions
+ throw new IllegalArgumentException("Unknown operation type " + operation.getIOOpertionType());
}
}
}