[ASTERIXDB-2813] Limit the number of flush/merge threads

- user model changes: no
- storage format changes: no
- interface changes: yes.

Details:
- Limit the number of flush/merge threads by introducing
the following parameters.
- storage.max.running.flushes.per.partition: the maximum
number of running flushes for each partition.
- storage.max.scheduled.merge.per.partition: the maximum
number of scheduled merges for each partition. This is
mainly used by the greedy scheduler.
- storage.max.running.merges.per.partition: the maximum
number of running mergese per partition.
- Basically, we limit the number of flush/merge threads
and put newly created flush/merge opreations into a wait
queue if the limit is reached.
- For the greedy scheduler, the scheduled merges
(i.e., merge threads) are more than the running merges
so that the scheduler can pick the smallest merge
for each LSM-tree.

Change-Id: I85a55423a1438b1d534c2e6a5968e675a99884c8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9183
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index e058d39..a9a3a3e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -582,20 +582,26 @@
 
     private ILSMIOOperationScheduler createIoScheduler(StorageProperties properties) {
         String schedulerName = storageProperties.getIoScheduler();
+        int numPartitions = ioManager.getIODevices().size();
+
+        int maxRunningFlushes = storageProperties.getMaxRunningFlushes(numPartitions);
+        int maxScheduledMerges = storageProperties.getMaxScheduledMerges(numPartitions);
+        int maxRunningMerges = storageProperties.getMaxRunningMerges(numPartitions);
+
         ILSMIOOperationScheduler ioScheduler = null;
         if (AsynchronousScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
             ioScheduler = AsynchronousScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
-                    HaltCallback.INSTANCE);
+                    HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
         } else if (GreedyScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
             ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
-                    HaltCallback.INSTANCE);
+                    HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
         } else {
             if (LOGGER.isWarnEnabled()) {
                 LOGGER.log(Level.WARN,
                         "Unknown storage I/O scheduler: " + schedulerName + "; defaulting to greedy I/O scheduler.");
             }
             ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
-                    HaltCallback.INSTANCE);
+                    HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
         }
         return ioScheduler;
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index fe73baf..c3a6839 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -173,7 +173,7 @@
                     public void operationFailed(ILSMIOOperation operation, Throwable t) {
                         LOGGER.warn("IO Operation failed", t);
                     }
-                }));
+                }, Integer.MAX_VALUE, Integer.MAX_VALUE));
         dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index a99a306..fc33b1a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -55,7 +55,10 @@
         STORAGE_COMPRESSION_BLOCK(STRING, "snappy"),
         STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(16, MEGABYTE)),
         STORAGE_IO_SCHEDULER(STRING, "greedy"),
-        STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l);
+        STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l),
+        STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION(NONNEGATIVE_INTEGER, 2),
+        STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 8),
+        STORAGE_MAX_RUNNING_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 2);
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -111,6 +114,12 @@
                     return "The number of bytes before each disk force (fsync)";
                 case STORAGE_IO_SCHEDULER:
                     return "The I/O scheduler for LSM flush and merge operations";
+                case STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION:
+                    return "The maximum number of running flushes per partition (0 means unlimited)";
+                case STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION:
+                    return "The maximum number of scheduled merges per partition (0 means unlimited)";
+                case STORAGE_MAX_RUNNING_MERGES_PER_PARTITION:
+                    return "The maximum number of running merges per partition (0 means unlimited)";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -204,6 +213,21 @@
         return accessor.getString(Option.STORAGE_IO_SCHEDULER);
     }
 
+    public int getMaxRunningFlushes(int numPartitions) {
+        int value = accessor.getInt(Option.STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION);
+        return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
+    }
+
+    public int getMaxScheduledMerges(int numPartitions) {
+        int value = accessor.getInt(Option.STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION);
+        return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
+    }
+
+    public int getMaxRunningMerges(int numPartitions) {
+        int value = accessor.getInt(Option.STORAGE_MAX_RUNNING_MERGES_PER_PARTITION);
+        return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
+    }
+
     protected int getMetadataDatasets() {
         return MetadataIndexImmutableProperties.METADATA_DATASETS_COUNT;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 753d27a..40998b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -177,4 +177,8 @@
      */
     boolean isActive();
 
+    /**
+     * @return whether this IO operation is completed
+     */
+    boolean isCompleted();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java
index 1c8a4e1..36bfc5a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java
@@ -21,7 +21,8 @@
 import java.util.concurrent.ThreadFactory;
 
 public interface ILSMIOOperationSchedulerFactory {
-    ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback);
+    ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback,
+            int maxNumRunningFlushes, int maxNumScheduledMerges, int maxNumRunningMerges);
 
     String getName();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
index 78185f0..e266a6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
@@ -34,13 +35,18 @@
 
 public abstract class AbstractAsynchronousScheduler implements ILSMIOOperationScheduler, Closeable {
     protected final ExecutorService executor;
+
+    private final int maxNumFlushes;
     protected final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<>();
-    protected final Map<String, Deque<ILSMIOOperation>> waitingFlushOperations = new HashMap<>();
+    protected final Deque<ILSMIOOperation> waitingFlushOperations = new ArrayDeque<>();
+    protected final Deque<ILSMIOOperation> waitingMergeOperations = new ArrayDeque<>();
+
     protected final Map<String, Throwable> failedGroups = new HashMap<>();
 
-    public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback) {
-        executor = new IoOperationExecutor(threadFactory, this, callback, runningFlushOperations,
-                waitingFlushOperations, failedGroups);
+    public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback,
+            int maxNumFlushes) {
+        executor = new IoOperationExecutor(threadFactory, this, callback, runningFlushOperations, failedGroups);
+        this.maxNumFlushes = maxNumFlushes;
     }
 
     @Override
@@ -61,27 +67,35 @@
         }
     }
 
+    @Override
+    public void completeOperation(ILSMIOOperation operation) throws HyracksDataException {
+        switch (operation.getIOOpertionType()) {
+            case FLUSH:
+                completeFlush(operation);
+                break;
+            case MERGE:
+                completeMerge(operation);
+            case NOOP:
+                return;
+            default:
+                // this should never happen
+                // just guard here to avoid silent failures in case of future extensions
+                throw new IllegalArgumentException("Unknown operation type " + operation.getIOOpertionType());
+        }
+    }
+
     protected abstract void scheduleMerge(ILSMIOOperation operation);
 
+    protected abstract void completeMerge(ILSMIOOperation operation);
+
     protected void scheduleFlush(ILSMIOOperation operation) {
         String id = operation.getIndexIdentifier();
         synchronized (executor) {
-            if (failedGroups.containsKey(id)) {
-                // Group failure. Fail the operation right away
-                operation.setStatus(LSMIOOperationStatus.FAILURE);
-                operation.setFailure(new RuntimeException("Operation group " + id + " has permanently failed",
-                        failedGroups.get(id)));
-                operation.complete();
+            if (checkFailedFlush(operation)) {
                 return;
             }
-            if (runningFlushOperations.containsKey(id)) {
-                if (waitingFlushOperations.containsKey(id)) {
-                    waitingFlushOperations.get(id).offer(operation);
-                } else {
-                    Deque<ILSMIOOperation> q = new ArrayDeque<>();
-                    q.offer(operation);
-                    waitingFlushOperations.put(id, q);
-                }
+            if (runningFlushOperations.size() >= maxNumFlushes || runningFlushOperations.containsKey(id)) {
+                waitingFlushOperations.add(operation);
             } else {
                 runningFlushOperations.put(id, operation);
                 executor.submit(operation);
@@ -89,6 +103,52 @@
         }
     }
 
+    private boolean checkFailedFlush(ILSMIOOperation operation) {
+        String id = operation.getIndexIdentifier();
+        if (failedGroups.containsKey(id)) {
+            // Group failure. Fail the operation right away
+            operation.setStatus(LSMIOOperationStatus.FAILURE);
+            operation.setFailure(
+                    new RuntimeException("Operation group " + id + " has permanently failed", failedGroups.get(id)));
+            operation.complete();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private void completeFlush(ILSMIOOperation operation) {
+        String id = operation.getIndexIdentifier();
+        synchronized (executor) {
+            runningFlushOperations.remove(id);
+
+            // Schedule flushes in FIFO order. Must make sure that there is at most one scheduled flush for each index.
+            for (ILSMIOOperation flushOp : waitingFlushOperations) {
+                String flushOpId = flushOp.getIndexIdentifier();
+                if (runningFlushOperations.size() < maxNumFlushes) {
+                    if (!runningFlushOperations.containsKey(flushOpId) && !flushOp.isCompleted()
+                            && !checkFailedFlush(flushOp)) {
+                        runningFlushOperations.put(flushOpId, flushOp);
+                        executor.submit(flushOp);
+                    }
+                } else {
+                    break;
+                }
+            }
+
+            // cleanup scheduled flushes
+            while (!waitingFlushOperations.isEmpty()) {
+                ILSMIOOperation top = waitingFlushOperations.peek();
+                if (top.isCompleted() || runningFlushOperations.get(top.getIndexIdentifier()) == top) {
+                    waitingFlushOperations.poll();
+                } else {
+                    break;
+                }
+            }
+
+        }
+    }
+
     @Override
     public void close() throws IOException {
         executor.shutdown();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
index 0938b5f..8317ca7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
@@ -202,6 +202,11 @@
         return isActive.get();
     }
 
+    @Override
+    public synchronized boolean isCompleted() {
+        return completed;
+    }
+
     public void waitIfPaused() throws HyracksDataException {
         synchronized (this) {
             while (!isActive.get()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index ac3481c..afd9a49 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -35,26 +35,49 @@
     public static final ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() {
         @Override
         public ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory,
-                IIoOperationFailedCallback callback) {
-            return new AsynchronousScheduler(threadFactory, callback);
+                IIoOperationFailedCallback callback, int maxNumRunningFlushes, int maxNumScheduledMerges,
+                int maxNumRunningMerges) {
+            return new AsynchronousScheduler(threadFactory, callback, maxNumRunningFlushes, maxNumRunningMerges);
         }
 
+        @Override
         public String getName() {
             return "async";
         }
     };
 
-    public AsynchronousScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback) {
-        super(threadFactory, callback);
+    private final int maxNumRunningMerges;
+    private int numRunningMerges = 0;
+
+    public AsynchronousScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback,
+            int maxNumRunningFlushes, int maxNumRunningMerges) {
+        super(threadFactory, callback, maxNumRunningFlushes);
+        this.maxNumRunningMerges = maxNumRunningMerges;
     }
 
     @Override
     protected void scheduleMerge(ILSMIOOperation operation) {
-        executor.submit(operation);
+        synchronized (executor) {
+            if (numRunningMerges >= maxNumRunningMerges) {
+                waitingMergeOperations.add(operation);
+            } else {
+                doScheduleMerge(operation);
+            }
+        }
     }
 
     @Override
-    public void completeOperation(ILSMIOOperation operation) {
-        // no op
+    protected void completeMerge(ILSMIOOperation operation) {
+        synchronized (executor) {
+            --numRunningMerges;
+            if (!waitingMergeOperations.isEmpty() && numRunningMerges < maxNumRunningMerges) {
+                doScheduleMerge(waitingMergeOperations.poll());
+            }
+        }
+    }
+
+    private void doScheduleMerge(ILSMIOOperation operation) {
+        ++numRunningMerges;
+        executor.submit(operation);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java
index 742ae24..f3afa43 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java
@@ -18,85 +18,141 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ThreadFactory;
 
 import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerFactory;
 
 /**
- * This is a greedy asynchronous scheduler that always allocates the full bandwidth for the merge operation
- * with the smallest required disk bandwidth to minimize the number of disk components. It has been proven
- * that if the number of components in all merge operations are the same, then this scheduler is optimal
- * by always minimizing the number of disk components over time; if not, this is still a good heuristic
+ * Under the greedy scheduler, a merge operation has the following lifecycles. When the merge policy submits a
+ * merge operation to the greedy scheduler, the merge operation is SCHEDULED if the number of scheduled merge
+ * operations is smaller than maxNumScheduledMergeOperations; otherwise, the merge operation is WAITING and is
+ * stored into a queue. WAITING merge operations will be scheduled after some existing merge operations finish
+ * in a FIFO order.
+ *
+ * The greedy scheduler always runs at most one (and smallest) merge operation for each LSM-tree. The maximum number of
+ * running merge operations is controlled by maxNumRunningMergeOperations. A SCHEDULED merge operation can become
+ * RUNNING if the greedy scheduler resumes this merge operation, and a RUNNING merge operation can become SCHEDULED
+ * if the greedy scheduler pauses this merge operation.
  *
  */
 public class GreedyScheduler extends AbstractAsynchronousScheduler {
-    public static final ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() {
+    public static ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() {
         @Override
         public ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory,
-                IIoOperationFailedCallback callback) {
-            return new GreedyScheduler(threadFactory, callback);
+                IIoOperationFailedCallback callback, int maxNumRunningFlushes, int maxNumScheduledMerges,
+                int maxNumRunningMerges) {
+            return new GreedyScheduler(threadFactory, callback, maxNumRunningFlushes, maxNumScheduledMerges,
+                    maxNumRunningMerges);
         }
 
+        @Override
         public String getName() {
             return "greedy";
         }
     };
 
-    private final Map<String, List<ILSMIOOperation>> mergeOperations = new HashMap<>();
+    private final int maxNumScheduledMerges;
+    private final int maxNumRunningMerges;
 
-    public GreedyScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback) {
-        super(threadFactory, callback);
+    private int numScheduledMerges;
+    private final Map<String, Set<ILSMIOOperation>> scheduledMergeOperations = new HashMap<>();
+    private final Map<String, ILSMIOOperation> runningMergeOperations = new HashMap<>();
+
+    public GreedyScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback, int maxNumRunningFlushes,
+            int maxNumScheduledMerges, int maxNumRunningMerges) {
+        super(threadFactory, callback, maxNumRunningFlushes);
+        this.maxNumScheduledMerges = maxNumScheduledMerges;
+        this.maxNumRunningMerges = maxNumRunningMerges;
     }
 
+    @Override
     protected void scheduleMerge(ILSMIOOperation operation) {
         operation.pause();
-        String id = operation.getIndexIdentifier();
         synchronized (executor) {
-            List<ILSMIOOperation> mergeOpList = mergeOperations.computeIfAbsent(id, key -> new ArrayList<>());
-            mergeOpList.add(operation);
-            dispatchMergeOperation(mergeOpList);
+            if (numScheduledMerges >= maxNumScheduledMerges) {
+                waitingMergeOperations.add(operation);
+            } else {
+                doScheduleMerge(operation);
+            }
         }
-        executor.submit(operation);
     }
 
-    private void dispatchMergeOperation(List<ILSMIOOperation> mergeOps) {
-        ILSMIOOperation activeOp = null;
+    private void doScheduleMerge(ILSMIOOperation operation) {
+        String indexIdentier = operation.getIndexIdentifier();
+        Set<ILSMIOOperation> mergeOps = scheduledMergeOperations.computeIfAbsent(indexIdentier, k -> new HashSet<>());
+        mergeOps.add(operation);
+        executor.submit(operation);
+        numScheduledMerges++;
+
+        dispatchMergeOperation(indexIdentier, mergeOps);
+    }
+
+    private void dispatchMergeOperation(String indexIdentier, Set<ILSMIOOperation> mergeOps) {
+        if (!runningMergeOperations.containsKey(indexIdentier)
+                && runningMergeOperations.size() >= maxNumRunningMerges) {
+            return;
+        }
+        ILSMIOOperation runningOp = null;
         ILSMIOOperation smallestMergeOp = null;
         for (ILSMIOOperation op : mergeOps) {
             if (op.isActive()) {
-                activeOp = op;
+                runningOp = op;
             }
             if (smallestMergeOp == null || op.getRemainingPages() < smallestMergeOp.getRemainingPages()) {
                 smallestMergeOp = op;
             }
         }
-        if (smallestMergeOp != activeOp) {
-            if (activeOp != null) {
-                activeOp.pause();
+        if (smallestMergeOp != runningOp) {
+            if (runningOp != null) {
+                runningOp.pause();
             }
             smallestMergeOp.resume();
+            runningMergeOperations.put(indexIdentier, smallestMergeOp);
         }
     }
 
     @Override
-    public void completeOperation(ILSMIOOperation op) {
-        if (op.getIOOpertionType() == LSMIOOperationType.MERGE) {
-            String id = op.getIndexIdentifier();
-            synchronized (executor) {
-                List<ILSMIOOperation> mergeOpList = mergeOperations.get(id);
-                mergeOpList.remove(op);
-                if (!mergeOpList.isEmpty()) {
-                    dispatchMergeOperation(mergeOpList);
+    protected void completeMerge(ILSMIOOperation op) {
+        String id = op.getIndexIdentifier();
+        synchronized (executor) {
+            Set<ILSMIOOperation> mergeOperations = scheduledMergeOperations.get(id);
+            mergeOperations.remove(op);
+            if (mergeOperations.isEmpty()) {
+                scheduledMergeOperations.remove(id);
+            }
+            runningMergeOperations.remove(id);
+            numScheduledMerges--;
+
+            if (!waitingMergeOperations.isEmpty() && numScheduledMerges < maxNumScheduledMerges) {
+                doScheduleMerge(waitingMergeOperations.poll());
+            }
+            if (runningMergeOperations.size() < maxNumRunningMerges) {
+                String indexWithMostScheduledMerges = findIndexWithMostScheduledMerges();
+                if (indexWithMostScheduledMerges != null) {
+                    dispatchMergeOperation(indexWithMostScheduledMerges,
+                            scheduledMergeOperations.get(indexWithMostScheduledMerges));
                 }
             }
         }
     }
+
+    private String findIndexWithMostScheduledMerges() {
+        String targetIndex = null;
+        int maxMerges = 0;
+        for (Map.Entry<String, Set<ILSMIOOperation>> e : scheduledMergeOperations.entrySet()) {
+            if (!runningMergeOperations.containsKey(e.getKey())
+                    && (targetIndex == null || maxMerges < e.getValue().size())) {
+                targetIndex = e.getKey();
+                maxMerges = e.getValue().size();
+            }
+        }
+        return targetIndex;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
index d5354ed..2a48627 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
-import java.util.Deque;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.RunnableFuture;
@@ -40,16 +39,14 @@
     private final IIoOperationFailedCallback callback;
     private final Map<String, ILSMIOOperation> runningFlushOperations;
     private final Map<String, Throwable> failedGroups;
-    private final Map<String, Deque<ILSMIOOperation>> waitingFlushOperations;
 
     public IoOperationExecutor(ThreadFactory threadFactory, ILSMIOOperationScheduler scheduler,
             IIoOperationFailedCallback callback, Map<String, ILSMIOOperation> runningFlushOperations,
-            Map<String, Deque<ILSMIOOperation>> waitingFlushOperations, Map<String, Throwable> failedGroups) {
+            Map<String, Throwable> failedGroups) {
         super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory);
         this.scheduler = scheduler;
         this.callback = callback;
         this.runningFlushOperations = runningFlushOperations;
-        this.waitingFlushOperations = waitingFlushOperations;
         this.failedGroups = failedGroups;
     }
 
@@ -80,20 +77,6 @@
             executedOp.complete(); // destroy if merge or successful flush
         }
         scheduler.completeOperation(executedOp);
-        if (executedOp.getIOOpertionType() == LSMIOOperationType.FLUSH) {
-            String id = executedOp.getIndexIdentifier();
-            synchronized (this) {
-                runningFlushOperations.remove(id);
-                if (waitingFlushOperations.containsKey(id)) {
-                    ILSMIOOperation op = waitingFlushOperations.get(id).poll();
-                    if (op != null) {
-                        scheduler.scheduleOperation(op);
-                    } else {
-                        waitingFlushOperations.remove(id);
-                    }
-                }
-            }
-        }
     }
 
     private void fail(ILSMIOOperation executedOp, Throwable t) throws HyracksDataException {
@@ -106,16 +89,6 @@
                 String id = executedOp.getIndexIdentifier();
                 failedGroups.put(id, t);
                 runningFlushOperations.remove(id);
-                if (waitingFlushOperations.containsKey(id)) {
-                    Deque<ILSMIOOperation> ops = waitingFlushOperations.remove(id);
-                    ILSMIOOperation next = ops.poll();
-                    while (next != null) {
-                        next.setFailure(new RuntimeException("Operation group " + id + " has permanently failed", t));
-                        next.setStatus(LSMIOOperationStatus.FAILURE);
-                        next.complete();
-                        next = ops.poll();
-                    }
-                }
             }
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
index 7351bdf..036ade2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
@@ -158,4 +158,9 @@
         return false;
     }
 
+    @Override
+    public boolean isCompleted() {
+        return true;
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
index 8adf5f7..4ab57c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
@@ -194,4 +194,9 @@
     public boolean isActive() {
         return ioOp.isActive();
     }
+
+    @Override
+    public boolean isCompleted() {
+        return ioOp.isCompleted();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java
index b4e4d84..7f8fd8a 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeComponentLifecycleTest.java
@@ -250,7 +250,8 @@
                     public void operationFailed(ILSMIOOperation operation, Throwable failure) {
                         LOGGER.log(Level.ERROR, "Operation {} failed", operation, failure);
                     }
-                }), new EncapsulatingIoCallbackFactory(harness.getIOOperationCallbackFactory(), NoOpTestCallback.get(),
+                }, Integer.MAX_VALUE, Integer.MAX_VALUE),
+                new EncapsulatingIoCallbackFactory(harness.getIOOperationCallbackFactory(), NoOpTestCallback.get(),
                         NoOpTestCallback.get(), new ITestOpCallback<ILSMIOOperation>() {
                             @Override
                             public void before(ILSMIOOperation t) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 3f36a34..f487bf1 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -121,7 +121,7 @@
             public void schedulerFailed(ILSMIOOperationScheduler scheduler, Throwable failure) {
                 ExitUtil.exit(ExitUtil.EC_IO_SCHEDULER_FAILED);
             }
-        });
+        }, Integer.MAX_VALUE, Integer.MAX_VALUE);
 
         lsmtree = LSMBTreeUtil.createLSMTree(ioManager, virtualBufferCaches, file, bufferCache, typeTraits,
                 cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, new NoMergePolicy(),
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java
deleted file mode 100644
index d03f7a5..0000000
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.lsm.common.test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
-import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler;
-import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIoOperationFailedCallback;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class GreedySchedulerTest {
-
-    private static final String INDEX_1 = "index1";
-    private static final String INDEX_2 = "index2";
-
-    private final Object lock = new Object();
-
-    @Test
-    public void test() throws Exception {
-        GreedyScheduler scheduler = new GreedyScheduler(r -> new Thread(r), NoOpIoOperationFailedCallback.INSTANCE);
-        AtomicBoolean active1 = new AtomicBoolean(true);
-        ILSMIOOperation op1 = mockMergeOperation(INDEX_1, 10, active1);
-
-        scheduler.scheduleOperation(op1);
-        // op1 is activated
-        Assert.assertTrue(active1.get());
-
-        AtomicBoolean active2 = new AtomicBoolean(true);
-        ILSMIOOperation op2 = mockMergeOperation(INDEX_2, 5, active2);
-        scheduler.scheduleOperation(op2);
-        // op2 does not interactive with op1s
-        Assert.assertTrue(active1.get());
-        Assert.assertTrue(active2.get());
-
-        scheduler.completeOperation(op2);
-        Assert.assertTrue(active1.get());
-
-        AtomicBoolean active3 = new AtomicBoolean(true);
-        ILSMIOOperation op3 = mockMergeOperation(INDEX_1, 5, active3);
-        scheduler.scheduleOperation(op3);
-        Assert.assertTrue(active3.get());
-        Assert.assertFalse(active1.get());
-
-        AtomicBoolean active4 = new AtomicBoolean(true);
-        ILSMIOOperation op4 = mockMergeOperation(INDEX_1, 7, active4);
-        scheduler.scheduleOperation(op4);
-        // op3 is still active
-        Assert.assertFalse(active1.get());
-        Assert.assertTrue(active3.get());
-        Assert.assertFalse(active4.get());
-
-        // suppose op1 is completed (though unlikely in practice), now op3 is still active
-        scheduler.completeOperation(op1);
-        Assert.assertTrue(active3.get());
-        Assert.assertFalse(active4.get());
-
-        // op3 completed, op4 is active
-        scheduler.completeOperation(op3);
-        Assert.assertTrue(active4.get());
-
-        synchronized (lock) {
-            lock.notifyAll();
-        }
-        scheduler.close();
-    }
-
-    private ILSMIOOperation mockMergeOperation(String index, long remainingPages, AtomicBoolean isActive)
-            throws HyracksDataException {
-        ILSMIOOperation mergeOp = Mockito.mock(ILSMIOOperation.class);
-        Mockito.when(mergeOp.getIndexIdentifier()).thenReturn(index);
-        Mockito.when(mergeOp.getIOOpertionType()).thenReturn(LSMIOOperationType.MERGE);
-        Mockito.when(mergeOp.getRemainingPages()).thenReturn(remainingPages);
-
-        Mockito.doAnswer(new Answer<Boolean>() {
-            @Override
-            public Boolean answer(InvocationOnMock invocation) throws Throwable {
-                return isActive.get();
-            }
-        }).when(mergeOp).isActive();
-        Mockito.doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                isActive.set(true);
-                return null;
-            }
-        }).when(mergeOp).resume();
-
-        Mockito.doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                isActive.set(false);
-                return null;
-            }
-        }).when(mergeOp).pause();
-
-        Mockito.doAnswer(new Answer<LSMIOOperationStatus>() {
-            @Override
-            public LSMIOOperationStatus answer(InvocationOnMock invocation) throws Throwable {
-                synchronized (lock) {
-                    lock.wait();
-                }
-                return LSMIOOperationStatus.SUCCESS;
-            }
-        }).when(mergeOp).call();
-        return mergeOp;
-
-    }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/IoSchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/IoSchedulerTest.java
new file mode 100644
index 0000000..15f65a4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/IoSchedulerTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.test;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
+import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIoOperationFailedCallback;
+import org.apache.hyracks.storage.am.lsm.common.test.IoSchedulerTest.MockedOperation;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class IoSchedulerTest {
+
+    protected static final String INDEX_1 = "index1";
+    protected static final String INDEX_2 = "index2";
+    protected static final String INDEX_3 = "index3";
+    protected static final String INDEX_4 = "index4";
+
+    protected static class MockedOperation {
+        public final ILSMIOOperation operation;
+        public final AtomicBoolean scheduled = new AtomicBoolean();
+        public final AtomicBoolean running = new AtomicBoolean();
+
+        public final Semaphore completedSemaphore = new Semaphore(0);
+
+        public MockedOperation(ILSMIOOperation mergeOp) {
+            this.operation = mergeOp;
+        }
+
+        public void waitForScheduled() throws InterruptedException {
+            synchronized (scheduled) {
+                while (!scheduled.get()) {
+                    scheduled.wait();
+                }
+            }
+        }
+
+        public void waitForRunning() throws InterruptedException {
+            synchronized (running) {
+                while (!running.get()) {
+                    running.wait();
+                }
+            }
+        }
+
+    }
+
+    @Test
+    public void testFlush() throws Exception {
+        int maxRunningFlushes = 2;
+
+        AsynchronousScheduler scheduler = (AsynchronousScheduler) AsynchronousScheduler.FACTORY
+                .createIoScheduler(r -> new Thread(r), NoOpIoOperationFailedCallback.INSTANCE, maxRunningFlushes, 0, 0);
+
+        MockedOperation op1_1 = mockFlushOperation(INDEX_1);
+        scheduler.scheduleOperation(op1_1.operation);
+        op1_1.waitForScheduled();
+
+        MockedOperation op1_2 = mockFlushOperation(INDEX_1);
+        scheduler.scheduleOperation(op1_2.operation);
+        Assert.assertFalse(op1_2.scheduled.get());
+
+        MockedOperation op2_1 = mockFlushOperation(INDEX_2);
+        scheduler.scheduleOperation(op2_1.operation);
+        op2_1.waitForScheduled();
+
+        MockedOperation op2_2 = mockFlushOperation(INDEX_2);
+        scheduler.scheduleOperation(op2_2.operation);
+        Assert.assertFalse(op2_2.scheduled.get());
+
+        // complete op1_1
+        op1_1.completedSemaphore.release();
+        op1_2.waitForScheduled();
+
+        // complete op1_2
+        op1_2.completedSemaphore.release();
+        Assert.assertFalse(op2_2.scheduled.get());
+
+        // complete op2_1
+        op2_1.completedSemaphore.release();
+        op2_2.waitForScheduled();
+
+        scheduler.close();
+    }
+
+    @Test
+    public void testAsynchronousMerge() throws Exception {
+        int maxRunningMerges = 2;
+
+        AsynchronousScheduler scheduler =
+                (AsynchronousScheduler) AsynchronousScheduler.FACTORY.createIoScheduler(r -> new Thread(r),
+                        NoOpIoOperationFailedCallback.INSTANCE, 0, maxRunningMerges, maxRunningMerges);
+
+        MockedOperation op1 = mockMergeOperation(INDEX_1, 10);
+        scheduler.scheduleOperation(op1.operation);
+        // op1 is scheduled
+        op1.waitForScheduled();
+
+        MockedOperation op2 = mockMergeOperation(INDEX_2, 10);
+        scheduler.scheduleOperation(op2.operation);
+        // op2 is scheduled
+        op2.waitForScheduled();
+
+        MockedOperation op3 = mockMergeOperation(INDEX_3, 10);
+        scheduler.scheduleOperation(op3.operation);
+        // op3 is waiting
+        Assert.assertFalse(op3.scheduled.get());
+        Assert.assertFalse(op3.running.get());
+
+        MockedOperation op4 = mockMergeOperation(INDEX_4, 10);
+        scheduler.scheduleOperation(op4.operation);
+        // op4 is waiting
+        Assert.assertFalse(op4.scheduled.get());
+        Assert.assertFalse(op4.running.get());
+
+        // complete op2 and wait for op3
+        op2.completedSemaphore.release();
+        op3.waitForScheduled();
+
+        // complete op3 and wait for op4
+        op3.completedSemaphore.release();
+        op4.waitForScheduled();
+
+        scheduler.close();
+    }
+
+    @Test
+    public void testGreedyMerge() throws Exception {
+        int maxScheduledMerges = 5;
+        int maxRunningMerges = 2;
+
+        GreedyScheduler scheduler = (GreedyScheduler) GreedyScheduler.FACTORY.createIoScheduler(r -> new Thread(r),
+                NoOpIoOperationFailedCallback.INSTANCE, 0, maxScheduledMerges, maxRunningMerges);
+
+        MockedOperation op1_1 = mockMergeOperation(INDEX_1, 10);
+        scheduler.scheduleOperation(op1_1.operation);
+        // op1_1 is running
+        op1_1.waitForScheduled();
+        op1_1.waitForRunning();
+
+        MockedOperation op2 = mockMergeOperation(INDEX_2, 10);
+        scheduler.scheduleOperation(op2.operation);
+        // op2 is running
+        op2.waitForScheduled();
+        op2.waitForRunning();
+
+        MockedOperation op3_1 = mockMergeOperation(INDEX_3, 10);
+        scheduler.scheduleOperation(op3_1.operation);
+        // op3_1 is scheduled, but not running
+        op3_1.waitForScheduled();
+        Assert.assertFalse(op3_1.running.get());
+
+        MockedOperation op3_2 = mockMergeOperation(INDEX_3, 5);
+        scheduler.scheduleOperation(op3_2.operation);
+        // op3_2 is scheduled, but not running
+        op3_2.waitForScheduled();
+        Assert.assertFalse(op3_2.running.get());
+
+        MockedOperation op4 = mockMergeOperation(INDEX_4, 10);
+        scheduler.scheduleOperation(op4.operation);
+        // op4 is scheduled, but not running
+        op4.waitForScheduled();
+        Assert.assertFalse(op4.running.get());
+
+        MockedOperation op1_2 = mockMergeOperation(INDEX_1, 5);
+        scheduler.scheduleOperation(op1_2.operation);
+        // op1_2 is waiting, not scheduled
+        Assert.assertFalse(op1_2.scheduled.get());
+        Assert.assertFalse(op1_2.running.get());
+
+        // complete op2
+        op2.completedSemaphore.release();
+
+        // op1_2 preempts op1_1 because op1_2 is smaller
+        op1_2.waitForRunning();
+        op1_2.waitForScheduled();
+
+        // op3_2 is running because index3 has more merges than index4
+        op3_2.waitForRunning();
+        Assert.assertFalse(op3_1.running.get());
+
+        scheduler.close();
+    }
+
+    protected MockedOperation mockMergeOperation(String index, long remainingPages) throws HyracksDataException {
+        return mockOperation(index, LSMIOOperationType.MERGE, remainingPages);
+    }
+
+    protected MockedOperation mockFlushOperation(String index) throws HyracksDataException {
+        return mockOperation(index, LSMIOOperationType.FLUSH, 0);
+    }
+
+    protected MockedOperation mockOperation(String index, LSMIOOperationType type, long remainingPages)
+            throws HyracksDataException {
+        ILSMIOOperation op = Mockito.mock(ILSMIOOperation.class);
+        MockedOperation mockedOp = new MockedOperation(op);
+        Mockito.when(op.getIndexIdentifier()).thenReturn(index);
+        Mockito.when(op.getIOOpertionType()).thenReturn(type);
+        Mockito.when(op.getRemainingPages()).thenReturn(remainingPages);
+
+        Mockito.doAnswer(new Answer<Boolean>() {
+            @Override
+            public Boolean answer(InvocationOnMock invocation) throws Throwable {
+                return mockedOp.running.get();
+            }
+        }).when(op).isActive();
+        Mockito.doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                mockedOp.running.set(true);
+                synchronized (mockedOp.running) {
+                    mockedOp.running.notifyAll();
+                }
+                return null;
+            }
+        }).when(op).resume();
+
+        Mockito.doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                mockedOp.running.set(false);
+                return null;
+            }
+        }).when(op).pause();
+
+        Mockito.doAnswer(new Answer<LSMIOOperationStatus>() {
+            @Override
+            public LSMIOOperationStatus answer(InvocationOnMock invocation) throws Throwable {
+                mockedOp.scheduled.set(true);
+                synchronized (mockedOp.scheduled) {
+                    mockedOp.scheduled.notifyAll();
+                }
+                mockedOp.completedSemaphore.acquire();
+                return LSMIOOperationStatus.SUCCESS;
+            }
+        }).when(op).call();
+        return mockedOp;
+
+    }
+
+}