separated IO scheduling from flush and merge policies

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1569 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index adacb2b..1687af8 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -19,7 +19,8 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialSchedulerProvider;
 import edu.uci.ics.hyracks.tests.am.common.LSMTreeOperatorTestHelper;
 
 public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
@@ -31,8 +32,9 @@
     }
 
     public IIndexDataflowHelperFactory createDataFlowHelperFactory() {
-        return new LSMBTreeDataflowHelperFactory(new SequentialFlushPolicyProvider(), new ConstantMergePolicyProvider(
-                MERGE_THRESHOLD));
+        return new LSMBTreeDataflowHelperFactory(
+                new ImmediateFlushPolicyProvider(SequentialSchedulerProvider.INSTANCE),
+                new ConstantMergePolicyProvider(SequentialSchedulerProvider.INSTANCE, MERGE_THRESHOLD));
     }
 
 }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
index b0e220c..daa3381 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
@@ -20,7 +20,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.tests.am.common.LSMTreeOperatorTestHelper;
 
@@ -35,7 +36,8 @@
     public IIndexDataflowHelperFactory createDataFlowHelperFactory(
             IPrimitiveValueProviderFactory[] valueProviderFactories, IBinaryComparatorFactory[] btreeComparatorFactories) {
         return new LSMRTreeDataflowHelperFactory(valueProviderFactories, btreeComparatorFactories,
-                new SequentialFlushPolicyProvider(), new ConstantMergePolicyProvider(MERGE_THRESHOLD));
+                new ImmediateFlushPolicyProvider(SequentialSchedulerProvider.INSTANCE),
+                new ConstantMergePolicyProvider(SequentialSchedulerProvider.INSTANCE, MERGE_THRESHOLD));
     }
 
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
index 1ab189c..a977911 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
@@ -2,5 +2,5 @@
 
 
 public interface ILSMFlushPolicy {
-    public void memoryComponentFull(ILSMIndex index);
+    public void memoryComponentExceededThreshold(ILSMIndex index);
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOScheduler.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOScheduler.java
new file mode 100644
index 0000000..866e208
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOScheduler.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+public interface ILSMIOScheduler {
+    public void scheduleFlush(ILSMIndex index);
+
+    public void scheduleMerge(ILSMIndex index);
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOSchedulerProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOSchedulerProvider.java
new file mode 100644
index 0000000..c1e8ecb
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOSchedulerProvider.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+public interface ILSMIOSchedulerProvider extends Serializable {
+    public ILSMIOScheduler getIOScheduler();
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index ee1b246..ae940e4 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -2,5 +2,5 @@
 
 
 public interface ILSMMergePolicy {
-    public void componentAdded(ILSMIndex index, int totalNumDiskComponents, boolean mergeInProgress);
+    public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents);
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index e5f92a9..cff8d9b 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -1,44 +1,24 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 
 public class ConstantMergePolicy implements ILSMMergePolicy {
 
-    private final ExecutorService executor;
+    private final ILSMIOScheduler ioScheduler;
 
     private final int threshold;
 
-    public ConstantMergePolicy(int threshold) {
-        this.executor = Executors.newCachedThreadPool();
+    public ConstantMergePolicy(ILSMIOScheduler ioScheduler, int threshold) {
+        this.ioScheduler = ioScheduler;
         this.threshold = threshold;
     }
 
     @Override
-    public void componentAdded(final ILSMIndex index, int totalNumDiskComponents, boolean mergeInProgress) {
-        synchronized (index) {
-            if (totalNumDiskComponents >= threshold && !mergeInProgress) {
-                executor.submit(new Runnable() {
-
-                    @Override
-                    public void run() {
-                        try {
-                            ((ILSMIndexAccessor) index.createAccessor()).merge();
-                        } catch (HyracksDataException e) {
-                            e.printStackTrace();
-                        } catch (IndexException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                });
-
-            }
+    public void diskComponentAdded(final ILSMIndex index, int totalNumDiskComponents) {
+        if (totalNumDiskComponents >= threshold) {
+            ioScheduler.scheduleMerge(index);
         }
     }
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
index 0d18077..c68b425 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
@@ -1,5 +1,6 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
 
@@ -7,15 +8,18 @@
 
     private static final long serialVersionUID = 1L;
 
+    private final ILSMIOSchedulerProvider schedulerProvider;
+
     private final int threshold;
 
-    public ConstantMergePolicyProvider(int threshold) {
+    public ConstantMergePolicyProvider(ILSMIOSchedulerProvider schedulerProvider, int threshold) {
+        this.schedulerProvider = schedulerProvider;
         this.threshold = threshold;
     }
 
     @Override
     public ILSMMergePolicy getMergePolicy() {
-        return new ConstantMergePolicy(threshold);
+        return new ConstantMergePolicy(schedulerProvider.getIOScheduler(), threshold);
     }
 
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicy.java
new file mode 100644
index 0000000..0fb6545
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicy.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class ImmediateFlushPolicy implements ILSMFlushPolicy {
+
+    private final ILSMIOScheduler ioScheduler;
+
+    public ImmediateFlushPolicy(ILSMIOScheduler ioScheduler) {
+        this.ioScheduler = ioScheduler;
+    }
+
+    @Override
+    public void memoryComponentExceededThreshold(final ILSMIndex index) {
+        // Schedule a flush immediately when the memory component is full
+        ioScheduler.scheduleFlush(index);
+    }
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicyProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicyProvider.java
new file mode 100644
index 0000000..8e73c09
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicyProvider.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOSchedulerProvider;
+
+public class ImmediateFlushPolicyProvider implements ILSMFlushPolicyProvider {
+
+    private static final long serialVersionUID = 1L;
+
+    private final ILSMIOSchedulerProvider schedulerProvider;
+
+    public ImmediateFlushPolicyProvider(ILSMIOSchedulerProvider schedulerProvider) {
+        this.schedulerProvider = schedulerProvider;
+    }
+
+    @Override
+    public ILSMFlushPolicy getFlushPolicy() {
+        return new ImmediateFlushPolicy(schedulerProvider.getIOScheduler());
+    }
+
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index b942d9e..7900b29 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -93,7 +93,7 @@
 
             // Flush will only be handled by last exiting thread.
             if (flushFlag && threadRefCount == 0) {
-                flushPolicy.memoryComponentFull(lsmIndex);
+                flushPolicy.memoryComponentExceededThreshold(lsmIndex);
             }
         }
     }
@@ -145,7 +145,7 @@
         lsmIndex.resetInMemoryComponent();
         synchronized (diskComponentsSync) {
             lsmIndex.addFlushedComponent(newComponent);
-            mergePolicy.componentAdded(lsmIndex, lsmIndex.getDiskComponents().size(), isMerging.get());
+            mergePolicy.diskComponentAdded(lsmIndex, lsmIndex.getDiskComponents().size());
         }
 
         // Unblock entering threads waiting for the flush
@@ -279,7 +279,7 @@
         lsmIndex.getComponentFinalizer().finalize(index);
         synchronized (diskComponentsSync) {
             lsmIndex.addFlushedComponent(index);
-            mergePolicy.componentAdded(lsmIndex, lsmIndex.getDiskComponents().size(), isMerging.get());
+            mergePolicy.diskComponentAdded(lsmIndex, lsmIndex.getDiskComponents().size());
         }
     }
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index d18436b..5d36c09 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -7,7 +7,7 @@
     INSTANCE;
 
     @Override
-    public void componentAdded(ILSMIndex index, int totalNumDiskComponents, boolean mergeInProgress) {
+    public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents) {
         // Do nothing
     }
 
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicy.java
deleted file mode 100644
index c7cbd18..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicy.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-
-public enum SequentialFlushPolicy implements ILSMFlushPolicy {
-    INSTANCE;
-
-    private static final Logger LOGGER = Logger.getLogger(SequentialFlushPolicy.class.getName());
-
-    private final ExecutorService executor;
-
-    private SequentialFlushPolicy() {
-        executor = Executors.newSingleThreadExecutor();
-    }
-
-    @Override
-    public void memoryComponentFull(final ILSMIndex index) {
-        executor.submit(new Runnable() {
-            public void run() {
-                try {
-                    LOGGER.info("flushing");
-                    ((ILSMIndexAccessor) index.createAccessor()).flush();
-                    LOGGER.info("finished flushing");
-                } catch (HyracksDataException e) {
-                    LOGGER.info(e.getMessage());
-                } catch (IndexException e) {
-                    LOGGER.info(e.getMessage());
-                }
-                LOGGER.info("Thread should end");
-            }
-        });
-    }
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicyProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicyProvider.java
deleted file mode 100644
index e48b892..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicyProvider.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicyProvider;
-
-public class SequentialFlushPolicyProvider implements ILSMFlushPolicyProvider {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public ILSMFlushPolicy getFlushPolicy() {
-        return SequentialFlushPolicy.INSTANCE;
-    }
-
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialScheduler.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialScheduler.java
new file mode 100644
index 0000000..8be892b
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialScheduler.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public enum SequentialScheduler implements ILSMIOScheduler {
+    INSTANCE;
+
+    private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+    @Override
+    public void scheduleFlush(final ILSMIndex index) {
+        executor.submit(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    ((ILSMIndexAccessor) index.createAccessor()).flush();
+                } catch (HyracksDataException e) {
+                    e.printStackTrace();
+                } catch (IndexException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+
+    @Override
+    public void scheduleMerge(final ILSMIndex index) {
+        executor.submit(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    ((ILSMIndexAccessor) index.createAccessor()).merge();
+                } catch (LSMMergeInProgressException e) {
+                    // Ignore!
+                } catch (HyracksDataException e) {
+                    e.printStackTrace();
+                } catch (IndexException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialSchedulerProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialSchedulerProvider.java
new file mode 100644
index 0000000..83e2136
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialSchedulerProvider.java
@@ -0,0 +1,14 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOSchedulerProvider;
+
+public enum SequentialSchedulerProvider implements ILSMIOSchedulerProvider {
+    INSTANCE;
+
+    @Override
+    public ILSMIOScheduler getIOScheduler() {
+        return SequentialScheduler.INSTANCE;
+    }
+
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
index 91232d1..0b2c645 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
@@ -27,8 +27,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
 
 public class LSMBTreeExamplesTest extends OrderedIndexExamplesTest {
     private final LSMBTreeTestHarness harness = new LSMBTreeTestHarness();
@@ -39,7 +39,7 @@
         return LSMBTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemOpCallback(),
                 harness.getMemFreePageManager(), harness.getIOManager(), harness.getOnDiskDir(),
                 harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), typeTraits, cmpFactories,
-                SequentialFlushPolicy.INSTANCE, NoMergePolicy.INSTANCE);
+                new ImmediateFlushPolicy(harness.getIOScheduler()), NoMergePolicy.INSTANCE);
     }
 
     @Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
index afe54a5..8dbfd75 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
@@ -29,8 +29,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
 
 public class LSMBTreeMultiThreadTest extends OrderedIndexMultiThreadTest {
 
@@ -54,7 +54,7 @@
         return LSMBTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemOpCallback(),
                 harness.getMemFreePageManager(), harness.getIOManager(), harness.getOnDiskDir(),
                 harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), typeTraits, cmpFactories,
-                SequentialFlushPolicy.INSTANCE, NoMergePolicy.INSTANCE);
+                new ImmediateFlushPolicy(harness.getIOScheduler()), NoMergePolicy.INSTANCE);
     }
 
     @Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 94bc112..d244af4 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -34,8 +34,9 @@
 import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -84,8 +85,8 @@
                 new LIFOMetaDataFrameFactory());
 
         lsmtree = LSMBTreeUtils.createLSMTree(memBufferCache, NoOpOperationCallback.INSTANCE, memFreePageManager,
-                ioManager, onDiskDir, bufferCache, fmp, typeTraits, cmpFactories, SequentialFlushPolicy.INSTANCE,
-                NoMergePolicy.INSTANCE);
+                ioManager, onDiskDir, bufferCache, fmp, typeTraits, cmpFactories, new ImmediateFlushPolicy(
+                        SequentialScheduler.INSTANCE), NoMergePolicy.INSTANCE);
     }
 
     @Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
index 1ff997d..c59e223 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
@@ -30,7 +30,8 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
@@ -69,7 +70,7 @@
         IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes, numKeyFields);
         LSMBTree lsmTree = LSMBTreeUtils.createLSMTree(memBufferCache, NoOpOperationCallback.INSTANCE,
                 memFreePageManager, ioManager, onDiskDir, diskBufferCache, diskFileMapProvider, typeTraits,
-                cmpFactories, SequentialFlushPolicy.INSTANCE, mergePolicy);
+                cmpFactories, new ImmediateFlushPolicy(SequentialScheduler.INSTANCE), mergePolicy);
         lsmTree.create(fileId);
         lsmTree.open(fileId);
         LSMBTreeTestContext testCtx = new LSMBTreeTestContext(fieldSerdes, lsmTree);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
index 63f3c75..7143448 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
@@ -31,8 +31,10 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -41,9 +43,9 @@
 
 public class LSMBTreeTestHarness {
     protected static final Logger LOGGER = Logger.getLogger(LSMBTreeTestHarness.class.getName());
-    
+
     public static final BTreeLeafFrameType[] LEAF_FRAMES_TO_TEST = new BTreeLeafFrameType[] { BTreeLeafFrameType.REGULAR_NSM };
-    
+
     private static final long RANDOM_SEED = 50;
     private static final int DEFAULT_DISK_PAGE_SIZE = 256;
     private static final int DEFAULT_DISK_NUM_PAGES = 1000;
@@ -51,48 +53,45 @@
     private static final int DEFAULT_MEM_PAGE_SIZE = 256;
     private static final int DEFAULT_MEM_NUM_PAGES = 100;
     private static final int DEFAULT_HYRACKS_FRAME_SIZE = 128;
-    private static final int DUMMY_FILE_ID = -1;           
-    
+    private static final int DUMMY_FILE_ID = -1;
+
     protected final int diskPageSize;
     protected final int diskNumPages;
     protected final int diskMaxOpenFiles;
     protected final int memPageSize;
     protected final int memNumPages;
     protected final int hyracksFrameSize;
-    
+
     protected IOManager ioManager;
     protected IBufferCache diskBufferCache;
     protected IFileMapProvider diskFileMapProvider;
     protected InMemoryBufferCache memBufferCache;
     protected InMemoryFreePageManager memFreePageManager;
     protected IHyracksTaskContext ctx;
-    
+    protected ILSMIOScheduler ioScheduler;
+
     protected final Random rnd = new Random();
     protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
-    protected final static String sep = System.getProperty("file.separator");    
+    protected final static String sep = System.getProperty("file.separator");
     protected String onDiskDir;
-    
+
     public LSMBTreeTestHarness() {
-    	this.diskPageSize = DEFAULT_DISK_PAGE_SIZE;
-    	this.diskNumPages = DEFAULT_DISK_NUM_PAGES;
-    	this.diskMaxOpenFiles = DEFAULT_DISK_MAX_OPEN_FILES;
-    	this.memPageSize = DEFAULT_MEM_PAGE_SIZE;
-    	this.memNumPages = DEFAULT_MEM_NUM_PAGES;
-    	this.hyracksFrameSize = DEFAULT_HYRACKS_FRAME_SIZE;
+        this(DEFAULT_DISK_PAGE_SIZE, DEFAULT_DISK_NUM_PAGES, DEFAULT_DISK_MAX_OPEN_FILES, DEFAULT_MEM_PAGE_SIZE,
+                DEFAULT_MEM_NUM_PAGES, DEFAULT_HYRACKS_FRAME_SIZE);
     }
-    
-	public LSMBTreeTestHarness(int diskPageSize, int diskNumPages,
-			int diskMaxOpenFiles, int memPageSize, int memNumPages,
-			int hyracksFrameSize) {
-		this.diskPageSize = diskPageSize;
-		this.diskNumPages = diskNumPages;
-		this.diskMaxOpenFiles = diskMaxOpenFiles;
-		this.memPageSize = memPageSize;
-		this.memNumPages = memNumPages;
-		this.hyracksFrameSize = hyracksFrameSize;
-	}
-    
-    public void setUp() throws HyracksException {        
+
+    public LSMBTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
+            int memNumPages, int hyracksFrameSize) {
+        this.diskPageSize = diskPageSize;
+        this.diskNumPages = diskNumPages;
+        this.diskMaxOpenFiles = diskMaxOpenFiles;
+        this.memPageSize = memPageSize;
+        this.memNumPages = memNumPages;
+        this.hyracksFrameSize = hyracksFrameSize;
+        this.ioScheduler = SequentialScheduler.INSTANCE;
+    }
+
+    public void setUp() throws HyracksException {
         onDiskDir = "lsm_btree_" + simpleDateFormat.format(new Date()) + sep;
         ctx = TestUtils.create(getHyracksFrameSize());
         TestStorageManagerComponentHolder.init(diskPageSize, diskNumPages, diskMaxOpenFiles);
@@ -103,10 +102,10 @@
         ioManager = TestStorageManagerComponentHolder.getIOManager();
         rnd.setSeed(RANDOM_SEED);
     }
-    
+
     public void tearDown() throws HyracksDataException {
         diskBufferCache.close();
-        for(IODeviceHandle dev : ioManager.getIODevices()) {            
+        for (IODeviceHandle dev : ioManager.getIODevices()) {
             File dir = new File(dev.getPath(), onDiskDir);
             FilenameFilter filter = new FilenameFilter() {
                 public boolean accept(File dir, String name) {
@@ -123,68 +122,72 @@
             dir.delete();
         }
     }
-    
+
     public int getDiskPageSize() {
         return diskPageSize;
     }
-    
+
     public int getDiskNumPages() {
         return diskNumPages;
     }
-    
+
     public int getDiskMaxOpenFiles() {
         return diskMaxOpenFiles;
     }
-    
+
     public int getMemPageSize() {
         return memPageSize;
     }
-    
+
     public int getMemNumPages() {
         return memNumPages;
     }
-    
+
     public int getHyracksFrameSize() {
         return hyracksFrameSize;
     }
-    
+
     public int getFileId() {
-    	return DUMMY_FILE_ID;
+        return DUMMY_FILE_ID;
     }
-    
+
     public IOManager getIOManager() {
         return ioManager;
     }
-    
+
     public IBufferCache getDiskBufferCache() {
-    	return diskBufferCache;
+        return diskBufferCache;
     }
-    
+
     public IFileMapProvider getDiskFileMapProvider() {
-    	return diskFileMapProvider;
+        return diskFileMapProvider;
     }
-    
+
     public InMemoryBufferCache getMemBufferCache() {
-    	return memBufferCache;
+        return memBufferCache;
     }
-    
+
     public InMemoryFreePageManager getMemFreePageManager() {
-    	return memFreePageManager;
+        return memFreePageManager;
     }
-    
+
     public IHyracksTaskContext getHyracksTastContext() {
-    	return ctx;
+        return ctx;
     }
-    
+
     public String getOnDiskDir() {
-    	return onDiskDir;
+        return onDiskDir;
     }
-    
+
     public Random getRandom() {
         return rnd;
     }
-    
+
     public IOperationCallback getMemOpCallback() {
         return NoOpOperationCallback.INSTANCE;
     }
+
+    public ILSMIOScheduler getIOScheduler() {
+        return ioScheduler;
+    }
 }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
index 72703c8..fc80861 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
@@ -25,8 +25,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
 import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeExamplesTest;
@@ -41,7 +41,7 @@
         return LSMRTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
                 harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
                 harness.getDiskFileMapProvider(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
-                valueProviderFactories, SequentialFlushPolicy.INSTANCE, NoMergePolicy.INSTANCE);
+                valueProviderFactories, new ImmediateFlushPolicy(harness.getIOScheduler()), NoMergePolicy.INSTANCE);
     }
 
     @Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
index 033200c..b689cf5 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
@@ -28,7 +28,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
 import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeMultiThreadTest;
@@ -56,7 +56,7 @@
         return LSMRTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
                 harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
                 harness.getDiskFileMapProvider(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
-                valueProviderFactories, SequentialFlushPolicy.INSTANCE, NoMergePolicy.INSTANCE);
+                valueProviderFactories, new ImmediateFlushPolicy(harness.getIOScheduler()), NoMergePolicy.INSTANCE);
 
     }
 
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
index 3ba6206..bf40384 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
@@ -27,7 +27,8 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
 import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
@@ -75,7 +76,7 @@
                 fieldSerdes.length);
         LSMRTree lsmTree = LSMRTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, onDiskDir,
                 diskBufferCache, diskFileMapProvider, typeTraits, rtreeCmpFactories, btreeCmpFactories,
-                valueProviderFactories, SequentialFlushPolicy.INSTANCE, mergePolicy);
+                valueProviderFactories, new ImmediateFlushPolicy(SequentialScheduler.INSTANCE), mergePolicy);
         lsmTree.create(fileId);
         lsmTree.open(fileId);
         LSMRTreeTestContext testCtx = new LSMRTreeTestContext(fieldSerdes, lsmTree);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
index bdde7ee..756c746 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
@@ -28,8 +28,10 @@
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeInMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeInMemoryFreePageManager;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -63,6 +65,7 @@
     protected LSMRTreeInMemoryBufferCache memBufferCache;
     protected LSMRTreeInMemoryFreePageManager memFreePageManager;
     protected IHyracksTaskContext ctx;
+    protected ILSMIOScheduler ioScheduler;
 
     protected final Random rnd = new Random();
     protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
@@ -70,12 +73,8 @@
     protected String onDiskDir;
 
     public LSMRTreeTestHarness() {
-        this.diskPageSize = DEFAULT_DISK_PAGE_SIZE;
-        this.diskNumPages = DEFAULT_DISK_NUM_PAGES;
-        this.diskMaxOpenFiles = DEFAULT_DISK_MAX_OPEN_FILES;
-        this.memPageSize = DEFAULT_MEM_PAGE_SIZE;
-        this.memNumPages = DEFAULT_MEM_NUM_PAGES;
-        this.hyracksFrameSize = DEFAULT_HYRACKS_FRAME_SIZE;
+        this(DEFAULT_DISK_PAGE_SIZE, DEFAULT_DISK_NUM_PAGES, DEFAULT_DISK_MAX_OPEN_FILES, DEFAULT_MEM_PAGE_SIZE,
+                DEFAULT_MEM_NUM_PAGES, DEFAULT_HYRACKS_FRAME_SIZE);
     }
 
     public LSMRTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -86,6 +85,7 @@
         this.memPageSize = memPageSize;
         this.memNumPages = memNumPages;
         this.hyracksFrameSize = hyracksFrameSize;
+        this.ioScheduler = SequentialScheduler.INSTANCE;
     }
 
     public void setUp() throws HyracksException {
@@ -179,4 +179,8 @@
     public Random getRandom() {
         return rnd;
     }
+
+    public ILSMIOScheduler getIOScheduler() {
+        return ioScheduler;
+    }
 }