[ASTERIXDB-2813] Limit the number of flush/merge threads
- user model changes: no
- storage format changes: no
- interface changes: yes.
Details:
- Limit the number of flush/merge threads by introducing
the following parameters.
- storage.max.running.flushes.per.partition: the maximum
number of running flushes for each partition.
- storage.max.scheduled.merge.per.partition: the maximum
number of scheduled merges for each partition. This is
mainly used by the greedy scheduler.
- storage.max.running.merges.per.partition: the maximum
number of running mergese per partition.
- Basically, we limit the number of flush/merge threads
and put newly created flush/merge opreations into a wait
queue if the limit is reached.
- For the greedy scheduler, the scheduled merges
(i.e., merge threads) are more than the running merges
so that the scheduler can pick the smallest merge
for each LSM-tree.
Change-Id: I85a55423a1438b1d534c2e6a5968e675a99884c8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9183
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index e058d39..a9a3a3e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -582,20 +582,26 @@
private ILSMIOOperationScheduler createIoScheduler(StorageProperties properties) {
String schedulerName = storageProperties.getIoScheduler();
+ int numPartitions = ioManager.getIODevices().size();
+
+ int maxRunningFlushes = storageProperties.getMaxRunningFlushes(numPartitions);
+ int maxScheduledMerges = storageProperties.getMaxScheduledMerges(numPartitions);
+ int maxRunningMerges = storageProperties.getMaxRunningMerges(numPartitions);
+
ILSMIOOperationScheduler ioScheduler = null;
if (AsynchronousScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
ioScheduler = AsynchronousScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
- HaltCallback.INSTANCE);
+ HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
} else if (GreedyScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
- HaltCallback.INSTANCE);
+ HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
} else {
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN,
"Unknown storage I/O scheduler: " + schedulerName + "; defaulting to greedy I/O scheduler.");
}
ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
- HaltCallback.INSTANCE);
+ HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
}
return ioScheduler;
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index fe73baf..c3a6839 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -173,7 +173,7 @@
public void operationFailed(ILSMIOOperation operation, Throwable t) {
LOGGER.warn("IO Operation failed", t);
}
- }));
+ }, Integer.MAX_VALUE, Integer.MAX_VALUE));
dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index a99a306..fc33b1a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -55,7 +55,10 @@
STORAGE_COMPRESSION_BLOCK(STRING, "snappy"),
STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(16, MEGABYTE)),
STORAGE_IO_SCHEDULER(STRING, "greedy"),
- STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l);
+ STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l),
+ STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION(NONNEGATIVE_INTEGER, 2),
+ STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 8),
+ STORAGE_MAX_RUNNING_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 2);
private final IOptionType interpreter;
private final Object defaultValue;
@@ -111,6 +114,12 @@
return "The number of bytes before each disk force (fsync)";
case STORAGE_IO_SCHEDULER:
return "The I/O scheduler for LSM flush and merge operations";
+ case STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION:
+ return "The maximum number of running flushes per partition (0 means unlimited)";
+ case STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION:
+ return "The maximum number of scheduled merges per partition (0 means unlimited)";
+ case STORAGE_MAX_RUNNING_MERGES_PER_PARTITION:
+ return "The maximum number of running merges per partition (0 means unlimited)";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -204,6 +213,21 @@
return accessor.getString(Option.STORAGE_IO_SCHEDULER);
}
+ public int getMaxRunningFlushes(int numPartitions) {
+ int value = accessor.getInt(Option.STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION);
+ return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
+ }
+
+ public int getMaxScheduledMerges(int numPartitions) {
+ int value = accessor.getInt(Option.STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION);
+ return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
+ }
+
+ public int getMaxRunningMerges(int numPartitions) {
+ int value = accessor.getInt(Option.STORAGE_MAX_RUNNING_MERGES_PER_PARTITION);
+ return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
+ }
+
protected int getMetadataDatasets() {
return MetadataIndexImmutableProperties.METADATA_DATASETS_COUNT;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 753d27a..40998b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -177,4 +177,8 @@
*/
boolean isActive();
+ /**
+ * @return whether this IO operation is completed
+ */
+ boolean isCompleted();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java
index 1c8a4e1..36bfc5a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java
@@ -21,7 +21,8 @@
import java.util.concurrent.ThreadFactory;
public interface ILSMIOOperationSchedulerFactory {
- ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback);
+ ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback,
+ int maxNumRunningFlushes, int maxNumScheduledMerges, int maxNumRunningMerges);
String getName();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
index 78185f0..e266a6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
@@ -27,6 +27,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
@@ -34,13 +35,18 @@
public abstract class AbstractAsynchronousScheduler implements ILSMIOOperationScheduler, Closeable {
protected final ExecutorService executor;
+
+ private final int maxNumFlushes;
protected final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<>();
- protected final Map<String, Deque<ILSMIOOperation>> waitingFlushOperations = new HashMap<>();
+ protected final Deque<ILSMIOOperation> waitingFlushOperations = new ArrayDeque<>();
+ protected final Deque<ILSMIOOperation> waitingMergeOperations = new ArrayDeque<>();
+
protected final Map<String, Throwable> failedGroups = new HashMap<>();
- public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback) {
- executor = new IoOperationExecutor(threadFactory, this, callback, runningFlushOperations,
- waitingFlushOperations, failedGroups);
+ public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback,
+ int maxNumFlushes) {
+ executor = new IoOperationExecutor(threadFactory, this, callback, runningFlushOperations, failedGroups);
+ this.maxNumFlushes = maxNumFlushes;
}
@Override
@@ -61,27 +67,35 @@
}
}
+ @Override
+ public void completeOperation(ILSMIOOperation operation) throws HyracksDataException {
+ switch (operation.getIOOpertionType()) {
+ case FLUSH:
+ completeFlush(operation);
+ break;
+ case MERGE:
+ completeMerge(operation);
+ case NOOP:
+ return;
+ default:
+ // this should never happen
+ // just guard here to avoid silent failures in case of future extensions
+ throw new IllegalArgumentException("Unknown operation type " + operation.getIOOpertionType());
+ }
+ }
+
protected abstract void scheduleMerge(ILSMIOOperation operation);
+ protected abstract void completeMerge(ILSMIOOperation operation);
+
protected void scheduleFlush(ILSMIOOperation operation) {
String id = operation.getIndexIdentifier();
synchronized (executor) {
- if (failedGroups.containsKey(id)) {
- // Group failure. Fail the operation right away
- operation.setStatus(LSMIOOperationStatus.FAILURE);
- operation.setFailure(new RuntimeException("Operation group " + id + " has permanently failed",
- failedGroups.get(id)));
- operation.complete();
+ if (checkFailedFlush(operation)) {
return;
}
- if (runningFlushOperations.containsKey(id)) {
- if (waitingFlushOperations.containsKey(id)) {
- waitingFlushOperations.get(id).offer(operation);
- } else {
- Deque<ILSMIOOperation> q = new ArrayDeque<>();
- q.offer(operation);
- waitingFlushOperations.put(id, q);
- }
+ if (runningFlushOperations.size() >= maxNumFlushes || runningFlushOperations.containsKey(id)) {
+ waitingFlushOperations.add(operation);
} else {
runningFlushOperations.put(id, operation);
executor.submit(operation);
@@ -89,6 +103,52 @@
}
}
+ private boolean checkFailedFlush(ILSMIOOperation operation) {
+ String id = operation.getIndexIdentifier();
+ if (failedGroups.containsKey(id)) {
+ // Group failure. Fail the operation right away
+ operation.setStatus(LSMIOOperationStatus.FAILURE);
+ operation.setFailure(
+ new RuntimeException("Operation group " + id + " has permanently failed", failedGroups.get(id)));
+ operation.complete();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void completeFlush(ILSMIOOperation operation) {
+ String id = operation.getIndexIdentifier();
+ synchronized (executor) {
+ runningFlushOperations.remove(id);
+
+ // Schedule flushes in FIFO order. Must make sure that there is at most one scheduled flush for each index.
+ for (ILSMIOOperation flushOp : waitingFlushOperations) {
+ String flushOpId = flushOp.getIndexIdentifier();
+ if (runningFlushOperations.size() < maxNumFlushes) {
+ if (!runningFlushOperations.containsKey(flushOpId) && !flushOp.isCompleted()
+ && !checkFailedFlush(flushOp)) {
+ runningFlushOperations.put(flushOpId, flushOp);
+ executor.submit(flushOp);
+ }
+ } else {
+ break;
+ }
+ }
+
+ // cleanup scheduled flushes
+ while (!waitingFlushOperations.isEmpty()) {
+ ILSMIOOperation top = waitingFlushOperations.peek();
+ if (top.isCompleted() || runningFlushOperations.get(top.getIndexIdentifier()) == top) {
+ waitingFlushOperations.poll();
+ } else {
+ break;
+ }
+ }
+
+ }
+ }
+
@Override
public void close() throws IOException {
executor.shutdown();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
index 0938b5f..8317ca7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
@@ -202,6 +202,11 @@
return isActive.get();
}
+ @Override
+ public synchronized boolean isCompleted() {
+ return completed;
+ }
+
public void waitIfPaused() throws HyracksDataException {
synchronized (this) {
while (!isActive.get()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index ac3481c..afd9a49 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -35,26 +35,49 @@
public static final ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() {
@Override
public ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory,
- IIoOperationFailedCallback callback) {
- return new AsynchronousScheduler(threadFactory, callback);
+ IIoOperationFailedCallback callback, int maxNumRunningFlushes, int maxNumScheduledMerges,
+ int maxNumRunningMerges) {
+ return new AsynchronousScheduler(threadFactory, callback, maxNumRunningFlushes, maxNumRunningMerges);
}
+ @Override
public String getName() {
return "async";
}
};
- public AsynchronousScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback) {
- super(threadFactory, callback);
+ private final int maxNumRunningMerges;
+ private int numRunningMerges = 0;
+
+ public AsynchronousScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback,
+ int maxNumRunningFlushes, int maxNumRunningMerges) {
+ super(threadFactory, callback, maxNumRunningFlushes);
+ this.maxNumRunningMerges = maxNumRunningMerges;
}
@Override
protected void scheduleMerge(ILSMIOOperation operation) {
- executor.submit(operation);
+ synchronized (executor) {
+ if (numRunningMerges >= maxNumRunningMerges) {
+ waitingMergeOperations.add(operation);
+ } else {
+ doScheduleMerge(operation);
+ }
+ }
}
@Override
- public void completeOperation(ILSMIOOperation operation) {
- // no op
+ protected void completeMerge(ILSMIOOperation operation) {
+ synchronized (executor) {
+ --numRunningMerges;
+ if (!waitingMergeOperations.isEmpty() && numRunningMerges < maxNumRunningMerges) {
+ doScheduleMerge(waitingMergeOperations.poll());
+ }
+ }
+ }
+
+ private void doScheduleMerge(ILSMIOOperation operation) {
+ ++numRunningMerges;
+ executor.submit(operation);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java
index 742ae24..f3afa43 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java
@@ -18,85 +18,141 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ThreadFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerFactory;
/**
- * This is a greedy asynchronous scheduler that always allocates the full bandwidth for the merge operation
- * with the smallest required disk bandwidth to minimize the number of disk components. It has been proven
- * that if the number of components in all merge operations are the same, then this scheduler is optimal
- * by always minimizing the number of disk components over time; if not, this is still a good heuristic
+ * Under the greedy scheduler, a merge operation has the following lifecycles. When the merge policy submits a
+ * merge operation to the greedy scheduler, the merge operation is SCHEDULED if the number of scheduled merge
+ * operations is smaller than maxNumScheduledMergeOperations; otherwise, the merge operation is WAITING and is
+ * stored into a queue. WAITING merge operations will be scheduled after some existing merge operations finish
+ * in a FIFO order.
+ *
+ * The greedy scheduler always runs at most one (and smallest) merge operation for each LSM-tree. The maximum number of
+ * running merge operations is controlled by maxNumRunningMergeOperations. A SCHEDULED merge operation can become
+ * RUNNING if the greedy scheduler resumes this merge operation, and a RUNNING merge operation can become SCHEDULED
+ * if the greedy scheduler pauses this merge operation.
*
*/
public class GreedyScheduler extends AbstractAsynchronousScheduler {
- public static final ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() {
+ public static ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() {
@Override
public ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory,
- IIoOperationFailedCallback callback) {
- return new GreedyScheduler(threadFactory, callback);
+ IIoOperationFailedCallback callback, int maxNumRunningFlushes, int maxNumScheduledMerges,
+ int maxNumRunningMerges) {
+ return new GreedyScheduler(threadFactory, callback, maxNumRunningFlushes, maxNumScheduledMerges,
+ maxNumRunningMerges);
}
+ @Override
public String getName() {
return "greedy";
}
};
- private final Map<String, List<ILSMIOOperation>> mergeOperations = new HashMap<>();
+ private final int maxNumScheduledMerges;
+ private final int maxNumRunningMerges;
- public GreedyScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback) {
- super(threadFactory, callback);
+ private int numScheduledMerges;
+ private final Map<String, Set<ILSMIOOperation>> scheduledMergeOperations = new HashMap<>();
+ private final Map<String, ILSMIOOperation> runningMergeOperations = new HashMap<>();
+
+ public GreedyScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback, int maxNumRunningFlushes,
+ int maxNumScheduledMerges, int maxNumRunningMerges) {
+ super(threadFactory, callback, maxNumRunningFlushes);
+ this.maxNumScheduledMerges = maxNumScheduledMerges;
+ this.maxNumRunningMerges = maxNumRunningMerges;
}
+ @Override
protected void scheduleMerge(ILSMIOOperation operation) {
operation.pause();
- String id = operation.getIndexIdentifier();
synchronized (executor) {
- List<ILSMIOOperation> mergeOpList = mergeOperations.computeIfAbsent(id, key -> new ArrayList<>());
- mergeOpList.add(operation);
- dispatchMergeOperation(mergeOpList);
+ if (numScheduledMerges >= maxNumScheduledMerges) {
+ waitingMergeOperations.add(operation);
+ } else {
+ doScheduleMerge(operation);
+ }
}
- executor.submit(operation);
}
- private void dispatchMergeOperation(List<ILSMIOOperation> mergeOps) {
- ILSMIOOperation activeOp = null;
+ private void doScheduleMerge(ILSMIOOperation operation) {
+ String indexIdentier = operation.getIndexIdentifier();
+ Set<ILSMIOOperation> mergeOps = scheduledMergeOperations.computeIfAbsent(indexIdentier, k -> new HashSet<>());
+ mergeOps.add(operation);
+ executor.submit(operation);
+ numScheduledMerges++;
+
+ dispatchMergeOperation(indexIdentier, mergeOps);
+ }
+
+ private void dispatchMergeOperation(String indexIdentier, Set<ILSMIOOperation> mergeOps) {
+ if (!runningMergeOperations.containsKey(indexIdentier)
+ && runningMergeOperations.size() >= maxNumRunningMerges) {
+ return;
+ }
+ ILSMIOOperation runningOp = null;
ILSMIOOperation smallestMergeOp = null;
for (ILSMIOOperation op : mergeOps) {
if (op.isActive()) {
- activeOp = op;
+ runningOp = op;
}
if (smallestMergeOp == null || op.getRemainingPages() < smallestMergeOp.getRemainingPages()) {
smallestMergeOp = op;
}
}
- if (smallestMergeOp != activeOp) {
- if (activeOp != null) {
- activeOp.pause();
+ if (smallestMergeOp != runningOp) {
+ if (runningOp != null) {
+ runningOp.pause();
}
smallestMergeOp.resume();
+ runningMergeOperations.put(indexIdentier, smallestMergeOp);
}
}
@Override
- public void completeOperation(ILSMIOOperation op) {
- if (op.getIOOpertionType() == LSMIOOperationType.MERGE) {
- String id = op.getIndexIdentifier();
- synchronized (executor) {
- List<ILSMIOOperation> mergeOpList = mergeOperations.get(id);
- mergeOpList.remove(op);
- if (!mergeOpList.isEmpty()) {
- dispatchMergeOperation(mergeOpList);
+ protected void completeMerge(ILSMIOOperation op) {
+ String id = op.getIndexIdentifier();
+ synchronized (executor) {
+ Set<ILSMIOOperation> mergeOperations = scheduledMergeOperations.get(id);
+ mergeOperations.remove(op);
+ if (mergeOperations.isEmpty()) {
+ scheduledMergeOperations.remove(id);
+ }
+ runningMergeOperations.remove(id);
+ numScheduledMerges--;
+
+ if (!waitingMergeOperations.isEmpty() && numScheduledMerges < maxNumScheduledMerges) {
+ doScheduleMerge(waitingMergeOperations.poll());
+ }
+ if (runningMergeOperations.size() < maxNumRunningMerges) {
+ String indexWithMostScheduledMerges = findIndexWithMostScheduledMerges();
+ if (indexWithMostScheduledMerges != null) {
+ dispatchMergeOperation(indexWithMostScheduledMerges,
+ scheduledMergeOperations.get(indexWithMostScheduledMerges));
}
}
}
}
+
+ private String findIndexWithMostScheduledMerges() {
+ String targetIndex = null;
+ int maxMerges = 0;
+ for (Map.Entry<String, Set<ILSMIOOperation>> e : scheduledMergeOperations.entrySet()) {
+ if (!runningMergeOperations.containsKey(e.getKey())
+ && (targetIndex == null || maxMerges < e.getValue().size())) {
+ targetIndex = e.getKey();
+ maxMerges = e.getValue().size();
+ }
+ }
+ return targetIndex;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
index d5354ed..2a48627 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
@@ -18,7 +18,6 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
-import java.util.Deque;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.RunnableFuture;
@@ -40,16 +39,14 @@
private final IIoOperationFailedCallback callback;
private final Map<String, ILSMIOOperation> runningFlushOperations;
private final Map<String, Throwable> failedGroups;
- private final Map<String, Deque<ILSMIOOperation>> waitingFlushOperations;
public IoOperationExecutor(ThreadFactory threadFactory, ILSMIOOperationScheduler scheduler,
IIoOperationFailedCallback callback, Map<String, ILSMIOOperation> runningFlushOperations,
- Map<String, Deque<ILSMIOOperation>> waitingFlushOperations, Map<String, Throwable> failedGroups) {
+ Map<String, Throwable> failedGroups) {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory);
this.scheduler = scheduler;
this.callback = callback;
this.runningFlushOperations = runningFlushOperations;
- this.waitingFlushOperations = waitingFlushOperations;
this.failedGroups = failedGroups;
}
@@ -80,20 +77,6 @@
executedOp.complete(); // destroy if merge or successful flush
}
scheduler.completeOperation(executedOp);
- if (executedOp.getIOOpertionType() == LSMIOOperationType.FLUSH) {
- String id = executedOp.getIndexIdentifier();
- synchronized (this) {
- runningFlushOperations.remove(id);
- if (waitingFlushOperations.containsKey(id)) {
- ILSMIOOperation op = waitingFlushOperations.get(id).poll();
- if (op != null) {
- scheduler.scheduleOperation(op);
- } else {
- waitingFlushOperations.remove(id);
- }
- }
- }
- }
}
private void fail(ILSMIOOperation executedOp, Throwable t) throws HyracksDataException {
@@ -106,16 +89,6 @@
String id = executedOp.getIndexIdentifier();
failedGroups.put(id, t);
runningFlushOperations.remove(id);
- if (waitingFlushOperations.containsKey(id)) {
- Deque<ILSMIOOperation> ops = waitingFlushOperations.remove(id);
- ILSMIOOperation next = ops.poll();
- while (next != null) {
- next.setFailure(new RuntimeException("Operation group " + id + " has permanently failed", t));
- next.setStatus(LSMIOOperationStatus.FAILURE);
- next.complete();
- next = ops.poll();
- }
- }
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
index 7351bdf..036ade2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
@@ -158,4 +158,9 @@
return false;
}
+ @Override
+ public boolean isCompleted() {
+ return true;
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
index 8adf5f7..4ab57c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
@@ -194,4 +194,9 @@
public boolean isActive() {
return ioOp.isActive();
}
+
+ @Override
+ public boolean isCompleted() {
+ return ioOp.isCompleted();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java
index b4e4d84..7f8fd8a 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java
@@ -250,7 +250,8 @@
public void operationFailed(ILSMIOOperation operation, Throwable failure) {
LOGGER.log(Level.ERROR, "Operation {} failed", operation, failure);
}
- }), new EncapsulatingIoCallbackFactory(harness.getIOOperationCallbackFactory(), NoOpTestCallback.get(),
+ }, Integer.MAX_VALUE, Integer.MAX_VALUE),
+ new EncapsulatingIoCallbackFactory(harness.getIOOperationCallbackFactory(), NoOpTestCallback.get(),
NoOpTestCallback.get(), new ITestOpCallback<ILSMIOOperation>() {
@Override
public void before(ILSMIOOperation t) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 3f36a34..f487bf1 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -121,7 +121,7 @@
public void schedulerFailed(ILSMIOOperationScheduler scheduler, Throwable failure) {
ExitUtil.exit(ExitUtil.EC_IO_SCHEDULER_FAILED);
}
- });
+ }, Integer.MAX_VALUE, Integer.MAX_VALUE);
lsmtree = LSMBTreeUtil.createLSMTree(ioManager, virtualBufferCaches, file, bufferCache, typeTraits,
cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, new NoMergePolicy(),
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java
deleted file mode 100644
index d03f7a5..0000000
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.lsm.common.test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
-import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler;
-import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIoOperationFailedCallback;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class GreedySchedulerTest {
-
- private static final String INDEX_1 = "index1";
- private static final String INDEX_2 = "index2";
-
- private final Object lock = new Object();
-
- @Test
- public void test() throws Exception {
- GreedyScheduler scheduler = new GreedyScheduler(r -> new Thread(r), NoOpIoOperationFailedCallback.INSTANCE);
- AtomicBoolean active1 = new AtomicBoolean(true);
- ILSMIOOperation op1 = mockMergeOperation(INDEX_1, 10, active1);
-
- scheduler.scheduleOperation(op1);
- // op1 is activated
- Assert.assertTrue(active1.get());
-
- AtomicBoolean active2 = new AtomicBoolean(true);
- ILSMIOOperation op2 = mockMergeOperation(INDEX_2, 5, active2);
- scheduler.scheduleOperation(op2);
- // op2 does not interactive with op1s
- Assert.assertTrue(active1.get());
- Assert.assertTrue(active2.get());
-
- scheduler.completeOperation(op2);
- Assert.assertTrue(active1.get());
-
- AtomicBoolean active3 = new AtomicBoolean(true);
- ILSMIOOperation op3 = mockMergeOperation(INDEX_1, 5, active3);
- scheduler.scheduleOperation(op3);
- Assert.assertTrue(active3.get());
- Assert.assertFalse(active1.get());
-
- AtomicBoolean active4 = new AtomicBoolean(true);
- ILSMIOOperation op4 = mockMergeOperation(INDEX_1, 7, active4);
- scheduler.scheduleOperation(op4);
- // op3 is still active
- Assert.assertFalse(active1.get());
- Assert.assertTrue(active3.get());
- Assert.assertFalse(active4.get());
-
- // suppose op1 is completed (though unlikely in practice), now op3 is still active
- scheduler.completeOperation(op1);
- Assert.assertTrue(active3.get());
- Assert.assertFalse(active4.get());
-
- // op3 completed, op4 is active
- scheduler.completeOperation(op3);
- Assert.assertTrue(active4.get());
-
- synchronized (lock) {
- lock.notifyAll();
- }
- scheduler.close();
- }
-
- private ILSMIOOperation mockMergeOperation(String index, long remainingPages, AtomicBoolean isActive)
- throws HyracksDataException {
- ILSMIOOperation mergeOp = Mockito.mock(ILSMIOOperation.class);
- Mockito.when(mergeOp.getIndexIdentifier()).thenReturn(index);
- Mockito.when(mergeOp.getIOOpertionType()).thenReturn(LSMIOOperationType.MERGE);
- Mockito.when(mergeOp.getRemainingPages()).thenReturn(remainingPages);
-
- Mockito.doAnswer(new Answer<Boolean>() {
- @Override
- public Boolean answer(InvocationOnMock invocation) throws Throwable {
- return isActive.get();
- }
- }).when(mergeOp).isActive();
- Mockito.doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- isActive.set(true);
- return null;
- }
- }).when(mergeOp).resume();
-
- Mockito.doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- isActive.set(false);
- return null;
- }
- }).when(mergeOp).pause();
-
- Mockito.doAnswer(new Answer<LSMIOOperationStatus>() {
- @Override
- public LSMIOOperationStatus answer(InvocationOnMock invocation) throws Throwable {
- synchronized (lock) {
- lock.wait();
- }
- return LSMIOOperationStatus.SUCCESS;
- }
- }).when(mergeOp).call();
- return mergeOp;
-
- }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/IoSchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/IoSchedulerTest.java
new file mode 100644
index 0000000..15f65a4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/IoSchedulerTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.test;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
+import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIoOperationFailedCallback;
+import org.apache.hyracks.storage.am.lsm.common.test.IoSchedulerTest.MockedOperation;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class IoSchedulerTest {
+
+ protected static final String INDEX_1 = "index1";
+ protected static final String INDEX_2 = "index2";
+ protected static final String INDEX_3 = "index3";
+ protected static final String INDEX_4 = "index4";
+
+ protected static class MockedOperation {
+ public final ILSMIOOperation operation;
+ public final AtomicBoolean scheduled = new AtomicBoolean();
+ public final AtomicBoolean running = new AtomicBoolean();
+
+ public final Semaphore completedSemaphore = new Semaphore(0);
+
+ public MockedOperation(ILSMIOOperation mergeOp) {
+ this.operation = mergeOp;
+ }
+
+ public void waitForScheduled() throws InterruptedException {
+ synchronized (scheduled) {
+ while (!scheduled.get()) {
+ scheduled.wait();
+ }
+ }
+ }
+
+ public void waitForRunning() throws InterruptedException {
+ synchronized (running) {
+ while (!running.get()) {
+ running.wait();
+ }
+ }
+ }
+
+ }
+
+ @Test
+ public void testFlush() throws Exception {
+ int maxRunningFlushes = 2;
+
+ AsynchronousScheduler scheduler = (AsynchronousScheduler) AsynchronousScheduler.FACTORY
+ .createIoScheduler(r -> new Thread(r), NoOpIoOperationFailedCallback.INSTANCE, maxRunningFlushes, 0, 0);
+
+ MockedOperation op1_1 = mockFlushOperation(INDEX_1);
+ scheduler.scheduleOperation(op1_1.operation);
+ op1_1.waitForScheduled();
+
+ MockedOperation op1_2 = mockFlushOperation(INDEX_1);
+ scheduler.scheduleOperation(op1_2.operation);
+ Assert.assertFalse(op1_2.scheduled.get());
+
+ MockedOperation op2_1 = mockFlushOperation(INDEX_2);
+ scheduler.scheduleOperation(op2_1.operation);
+ op2_1.waitForScheduled();
+
+ MockedOperation op2_2 = mockFlushOperation(INDEX_2);
+ scheduler.scheduleOperation(op2_2.operation);
+ Assert.assertFalse(op2_2.scheduled.get());
+
+ // complete op1_1
+ op1_1.completedSemaphore.release();
+ op1_2.waitForScheduled();
+
+ // complete op1_2
+ op1_2.completedSemaphore.release();
+ Assert.assertFalse(op2_2.scheduled.get());
+
+ // complete op2_1
+ op2_1.completedSemaphore.release();
+ op2_2.waitForScheduled();
+
+ scheduler.close();
+ }
+
+ @Test
+ public void testAsynchronousMerge() throws Exception {
+ int maxRunningMerges = 2;
+
+ AsynchronousScheduler scheduler =
+ (AsynchronousScheduler) AsynchronousScheduler.FACTORY.createIoScheduler(r -> new Thread(r),
+ NoOpIoOperationFailedCallback.INSTANCE, 0, maxRunningMerges, maxRunningMerges);
+
+ MockedOperation op1 = mockMergeOperation(INDEX_1, 10);
+ scheduler.scheduleOperation(op1.operation);
+ // op1 is scheduled
+ op1.waitForScheduled();
+
+ MockedOperation op2 = mockMergeOperation(INDEX_2, 10);
+ scheduler.scheduleOperation(op2.operation);
+ // op2 is scheduled
+ op2.waitForScheduled();
+
+ MockedOperation op3 = mockMergeOperation(INDEX_3, 10);
+ scheduler.scheduleOperation(op3.operation);
+ // op3 is waiting
+ Assert.assertFalse(op3.scheduled.get());
+ Assert.assertFalse(op3.running.get());
+
+ MockedOperation op4 = mockMergeOperation(INDEX_4, 10);
+ scheduler.scheduleOperation(op4.operation);
+ // op4 is waiting
+ Assert.assertFalse(op4.scheduled.get());
+ Assert.assertFalse(op4.running.get());
+
+ // complete op2 and wait for op3
+ op2.completedSemaphore.release();
+ op3.waitForScheduled();
+
+ // complete op3 and wait for op4
+ op3.completedSemaphore.release();
+ op4.waitForScheduled();
+
+ scheduler.close();
+ }
+
+ @Test
+ public void testGreedyMerge() throws Exception {
+ int maxScheduledMerges = 5;
+ int maxRunningMerges = 2;
+
+ GreedyScheduler scheduler = (GreedyScheduler) GreedyScheduler.FACTORY.createIoScheduler(r -> new Thread(r),
+ NoOpIoOperationFailedCallback.INSTANCE, 0, maxScheduledMerges, maxRunningMerges);
+
+ MockedOperation op1_1 = mockMergeOperation(INDEX_1, 10);
+ scheduler.scheduleOperation(op1_1.operation);
+ // op1_1 is running
+ op1_1.waitForScheduled();
+ op1_1.waitForRunning();
+
+ MockedOperation op2 = mockMergeOperation(INDEX_2, 10);
+ scheduler.scheduleOperation(op2.operation);
+ // op2 is running
+ op2.waitForScheduled();
+ op2.waitForRunning();
+
+ MockedOperation op3_1 = mockMergeOperation(INDEX_3, 10);
+ scheduler.scheduleOperation(op3_1.operation);
+ // op3_1 is scheduled, but not running
+ op3_1.waitForScheduled();
+ Assert.assertFalse(op3_1.running.get());
+
+ MockedOperation op3_2 = mockMergeOperation(INDEX_3, 5);
+ scheduler.scheduleOperation(op3_2.operation);
+ // op3_2 is scheduled, but not running
+ op3_2.waitForScheduled();
+ Assert.assertFalse(op3_2.running.get());
+
+ MockedOperation op4 = mockMergeOperation(INDEX_4, 10);
+ scheduler.scheduleOperation(op4.operation);
+ // op4 is scheduled, but not running
+ op4.waitForScheduled();
+ Assert.assertFalse(op4.running.get());
+
+ MockedOperation op1_2 = mockMergeOperation(INDEX_1, 5);
+ scheduler.scheduleOperation(op1_2.operation);
+ // op1_2 is waiting, not scheduled
+ Assert.assertFalse(op1_2.scheduled.get());
+ Assert.assertFalse(op1_2.running.get());
+
+ // complete op2
+ op2.completedSemaphore.release();
+
+ // op1_2 preempts op1_1 because op1_2 is smaller
+ op1_2.waitForRunning();
+ op1_2.waitForScheduled();
+
+ // op3_2 is running because index3 has more merges than index4
+ op3_2.waitForRunning();
+ Assert.assertFalse(op3_1.running.get());
+
+ scheduler.close();
+ }
+
+ protected MockedOperation mockMergeOperation(String index, long remainingPages) throws HyracksDataException {
+ return mockOperation(index, LSMIOOperationType.MERGE, remainingPages);
+ }
+
+ protected MockedOperation mockFlushOperation(String index) throws HyracksDataException {
+ return mockOperation(index, LSMIOOperationType.FLUSH, 0);
+ }
+
+ protected MockedOperation mockOperation(String index, LSMIOOperationType type, long remainingPages)
+ throws HyracksDataException {
+ ILSMIOOperation op = Mockito.mock(ILSMIOOperation.class);
+ MockedOperation mockedOp = new MockedOperation(op);
+ Mockito.when(op.getIndexIdentifier()).thenReturn(index);
+ Mockito.when(op.getIOOpertionType()).thenReturn(type);
+ Mockito.when(op.getRemainingPages()).thenReturn(remainingPages);
+
+ Mockito.doAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ return mockedOp.running.get();
+ }
+ }).when(op).isActive();
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ mockedOp.running.set(true);
+ synchronized (mockedOp.running) {
+ mockedOp.running.notifyAll();
+ }
+ return null;
+ }
+ }).when(op).resume();
+
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ mockedOp.running.set(false);
+ return null;
+ }
+ }).when(op).pause();
+
+ Mockito.doAnswer(new Answer<LSMIOOperationStatus>() {
+ @Override
+ public LSMIOOperationStatus answer(InvocationOnMock invocation) throws Throwable {
+ mockedOp.scheduled.set(true);
+ synchronized (mockedOp.scheduled) {
+ mockedOp.scheduled.notifyAll();
+ }
+ mockedOp.completedSemaphore.acquire();
+ return LSMIOOperationStatus.SUCCESS;
+ }
+ }).when(op).call();
+ return mockedOp;
+
+ }
+
+}