split flush and merge into two-part calls to allow for IO scheduling

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1736 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java
index 1965f3f..cb8443d 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java
@@ -60,10 +60,10 @@
      */
     public IBinaryComparatorFactory[] getComparatorFactories();
 
- 	/**
-	 * @param fillFactor
- 	 * @throws TreeIndexException if the user tries to instantiate a second bulk
-	 * loader
- 	 */
-	public IIndexBulkLoader createBulkLoader(float fillFactor) throws TreeIndexException;
+    /**
+     * @param fillFactor
+     * @throws TreeIndexException if the user tries to instantiate a second bulk
+     * loader
+     */
+    public IIndexBulkLoader createBulkLoader(float fillFactor) throws TreeIndexException;
 }
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 4a3f7bf..c0fe5af 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -52,13 +52,17 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexComponentFinalizer;
@@ -263,14 +267,15 @@
     }
 
     @Override
-    public ITreeIndex flush() throws HyracksDataException, IndexException {
+    public ITreeIndex flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+        LSMFlushOperation flushOp = (LSMFlushOperation) operation;
         // Bulk load a new on-disk BTree from the in-memory BTree.        
         RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
         ITreeIndexAccessor memBTreeAccessor = memBTree.createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
         IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor();
         memBTreeAccessor.search(scanCursor, nullPred);
-        BTree diskBTree = createFlushTarget();
+        BTree diskBTree = createDiskBTree(diskBTreeFactory, flushOp.getFlushTarget(), true);
         // Bulk load the tuples from the in-memory BTree into the new disk BTree.
         IIndexBulkLoader bulkLoader = diskBTree.createBulkLoader(1.0f);
         try {
@@ -302,23 +307,6 @@
         return createDiskBTree(bulkLoadBTreeFactory, fileRef, true);
     }
 
-    private BTree createFlushTarget() throws HyracksDataException {
-        String relFlushFileName = (String) fileManager.getRelFlushFileName();
-        FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
-        return createDiskBTree(diskBTreeFactory, fileRef, true);
-    }
-
-    private BTree createMergeTarget(List<Object> mergingDiskBTrees) throws HyracksDataException {
-        BTree firstBTree = (BTree) mergingDiskBTrees.get(0);
-        BTree lastBTree = (BTree) mergingDiskBTrees.get(mergingDiskBTrees.size() - 1);
-        FileReference firstFile = diskFileMapProvider.lookupFileName(firstBTree.getFileId());
-        FileReference lastFile = diskFileMapProvider.lookupFileName(lastBTree.getFileId());
-        String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile
-                .getFile().getName());
-        FileReference fileRef = fileManager.createMergeFile(relMergeFileName);
-        return createDiskBTree(diskBTreeFactory, fileRef, true);
-    }
-
     private BTree createDiskBTree(TreeFactory<BTree> factory, FileReference fileRef, boolean createBTree)
             throws HyracksDataException {
         // Create new BTree instance.
@@ -365,15 +353,12 @@
         lsmTreeCursor.initPriorityQueue();
     }
 
-    public ITreeIndex merge(List<Object> mergedComponents) throws HyracksDataException, IndexException {
-        LSMBTreeOpContext ctx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-        ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor();
-        RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
-        // Ordered scan, ignoring the in-memory BTree.
-        // We get back a snapshot of the on-disk BTrees that are going to be
-        // merged now, so we can clean them up after the merge has completed.
-        List<Object> mergingDiskBTrees = lsmHarness.search(cursor, (RangePredicate) rangePred, ctx, false);
-        mergedComponents.addAll(mergingDiskBTrees);
+    public ITreeIndex merge(List<Object> mergedComponents, ILSMIOOperation operation) throws HyracksDataException,
+            IndexException {
+        LSMMergeOperation mergeOp = (LSMMergeOperation) operation;
+        ITreeIndexCursor cursor = mergeOp.getCursor();
+
+        mergedComponents.addAll(mergeOp.getMergingComponents());
 
         // Nothing to merge.
         if (mergedComponents.size() <= 1) {
@@ -382,7 +367,7 @@
         }
 
         // Bulk load the tuples from all on-disk BTrees into the new BTree.
-        BTree mergedBTree = createMergeTarget(mergingDiskBTrees);
+        BTree mergedBTree = createDiskBTree(diskBTreeFactory, mergeOp.getMergeTarget(), true);
         IIndexBulkLoader bulkLoader = mergedBTree.createBulkLoader(1.0f);
         try {
             while (cursor.hasNext()) {
@@ -518,6 +503,41 @@
             LSMBTreeOpContext concreteCtx = (LSMBTreeOpContext) ctx;
             return concreteCtx.cmp;
         }
+
+        @Override
+        public ILSMIOOperation createFlushOperation(ILSMIOOperationCallback callback) {
+            String relFlushFileName = (String) fileManager.getRelFlushFileName();
+            FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
+            return new LSMFlushOperation(lsmHarness.getIndex(), fileRef, callback);
+        }
+    }
+
+    public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
+        LSMBTreeOpContext ctx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor();
+        RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
+        // Ordered scan, ignoring the in-memory BTree.
+        // We get back a snapshot of the on-disk BTrees that are going to be
+        // merged now, so we can clean them up after the merge has completed.
+        List<Object> mergingDiskBTrees;
+        try {
+            mergingDiskBTrees = lsmHarness.search(cursor, (RangePredicate) rangePred, ctx, false);
+            if (mergingDiskBTrees.size() <= 1) {
+                cursor.close();
+                return null;
+            }
+        } catch (IndexException e) {
+            throw new HyracksDataException(e);
+        }
+
+        BTree firstBTree = (BTree) mergingDiskBTrees.get(0);
+        BTree lastBTree = (BTree) mergingDiskBTrees.get(mergingDiskBTrees.size() - 1);
+        FileReference firstFile = diskFileMapProvider.lookupFileName(firstBTree.getFileId());
+        FileReference lastFile = diskFileMapProvider.lookupFileName(lastBTree.getFileId());
+        String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile
+                .getFile().getName());
+        FileReference fileRef = fileManager.createMergeFile(relMergeFileName);
+        return new LSMMergeOperation(lsmHarness.getIndex(), mergingDiskBTrees, cursor, fileRef, callback);
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 95dea2f..a90b64e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -43,13 +43,16 @@
     public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
             boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException;
 
-    public Object merge(List<Object> mergedComponents) throws HyracksDataException, IndexException;
+    public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException;
+
+    public Object merge(List<Object> mergedComponents, ILSMIOOperation operation) throws HyracksDataException,
+            IndexException;
 
     public void addMergedComponent(Object newComponent, List<Object> mergedComponents);
 
     public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException;
 
-    public Object flush() throws HyracksDataException, IndexException;
+    public Object flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
 
     public void addFlushedComponent(Object index);
 
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 6c10d26..b768cfd 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -20,6 +20,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeInProgressException;
 
 /**
  * Client handle for performing operations
@@ -29,13 +30,18 @@
  * concurrent operations).
  */
 public interface ILSMIndexAccessor extends IIndexAccessor {
+    public ILSMIOOperation createFlushOperation(ILSMIOOperationCallback callback);
+
+    public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
+            LSMMergeInProgressException;
+
     /**
      * Force a flush of the in-memory component.
      * 
      * @throws HyracksDataException
      * @throws TreeIndexException
      */
-    public void flush() throws HyracksDataException, IndexException;
+    public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
 
     /**
      * Merge all on-disk components.
@@ -43,7 +49,7 @@
      * @throws HyracksDataException
      * @throws TreeIndexException
      */
-    public void merge() throws HyracksDataException, IndexException;
+    public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException;
 
     /**
      * Deletes the tuple from the memory component only.
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index ae940e4..2a1f0e7 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -1,6 +1,8 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
 
 public interface ILSMMergePolicy {
-    public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents);
+    public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents) throws HyracksDataException;
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
new file mode 100644
index 0000000..0caef1a
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+
+public class BlockingIOOperationCallback implements ILSMIOOperationCallback {
+
+    @Override
+    public void callback() {
+        this.notifyAll();
+    }
+
+    public void block() throws InterruptedException {
+        this.wait();
+    }
+
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index 1f63cd3..8140b0e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -1,7 +1,11 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 
 public class ConstantMergePolicy implements ILSMMergePolicy {
@@ -16,9 +20,19 @@
     }
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, int totalNumDiskComponents) {
+    public void diskComponentAdded(final ILSMIndex index, int totalNumDiskComponents) throws HyracksDataException {
         if (totalNumDiskComponents >= threshold) {
-            ioScheduler.scheduleOperation(new LSMMergeOperation(index));
+            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+            ILSMIOOperation op;
+            try {
+                op = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+                if (op != null) {
+                    ioScheduler.scheduleOperation(op);
+                }
+            } catch (LSMMergeInProgressException e) {
+                // Do nothing
+            }
         }
     }
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
index 1368899..0c70bc1 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
@@ -4,6 +4,7 @@
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -15,9 +16,13 @@
 public class LSMFlushOperation implements ILSMIOOperation {
 
     private final ILSMIndex index;
+    private final FileReference flushTarget;
+    private final ILSMIOOperationCallback callback;
 
-    public LSMFlushOperation(ILSMIndex index) {
+    public LSMFlushOperation(ILSMIndex index, FileReference flushTarget, ILSMIOOperationCallback callback) {
         this.index = index;
+        this.flushTarget = flushTarget;
+        this.callback = callback;
     }
 
     @Override
@@ -27,20 +32,23 @@
 
     @Override
     public List<IODeviceHandle> getWriteDevices() {
-        // TODO Auto-generated method stub
-        return null;
+        return Collections.singletonList(flushTarget.getDevideHandle());
     }
 
     @Override
     public void perform() throws HyracksDataException, IndexException {
         ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
-        accessor.flush();
+        accessor.flush(this);
     }
 
     @Override
     public ILSMIOOperationCallback getCallback() {
-        return NoOpIOOperationCallback.INSTANCE;
+        return callback;
+    }
+
+    public FileReference getFlushTarget() {
+        return flushTarget;
     }
 
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 2fe1093..44f305e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -29,6 +29,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -101,11 +103,11 @@
         }
     }
 
-    public void flush() throws HyracksDataException, IndexException {
+    public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Flushing LSM-Tree.");
         }
-        Object newComponent = lsmIndex.flush();
+        Object newComponent = lsmIndex.flush(operation);
 
         // The implementation of this call must take any necessary steps to make
         // the new component permanent, and mark it as valid (usually this means
@@ -150,12 +152,21 @@
         return diskComponentSnapshot;
     }
 
-    public void merge() throws HyracksDataException, IndexException {
+    public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws LSMMergeInProgressException,
+            HyracksDataException {
         if (!isMerging.compareAndSet(false, true)) {
             throw new LSMMergeInProgressException(
                     "Merge already in progress. Only one merge process allowed at a time.");
         }
 
+        ILSMIOOperation mergeOp = lsmIndex.createMergeOperation(callback);
+        if (mergeOp == null) {
+            isMerging.set(false);
+        }
+        return mergeOp;
+    }
+
+    public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Merging LSM-Tree.");
         }
@@ -165,7 +176,7 @@
         AtomicInteger localSearcherRefCount = searcherRefCount;
 
         List<Object> mergedComponents = new ArrayList<Object>();
-        Object newComponent = lsmIndex.merge(mergedComponents);
+        Object newComponent = lsmIndex.merge(mergedComponents, operation);
 
         // No merge happened.
         if (newComponent == null) {
@@ -246,4 +257,8 @@
     public ILSMIOOperationScheduler getIOScheduler() {
         return ioScheduler;
     }
+
+    public ILSMIndex getIndex() {
+        return lsmIndex;
+    }
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMMergeOperation.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMMergeOperation.java
index 2f82b7a..a38f82b 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMMergeOperation.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMMergeOperation.java
@@ -1,10 +1,15 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.AbstractTreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -14,33 +19,56 @@
 public class LSMMergeOperation implements ILSMIOOperation {
 
     private final ILSMIndex index;
+    private final List<Object> mergingComponents;
+    private final ITreeIndexCursor cursor;
+    private final FileReference mergeTarget;
+    private final ILSMIOOperationCallback callback;
 
-    public LSMMergeOperation(ILSMIndex index) {
+    public LSMMergeOperation(ILSMIndex index, List<Object> mergingComponents, ITreeIndexCursor cursor,
+            FileReference mergeTarget, ILSMIOOperationCallback callback) {
         this.index = index;
+        this.mergingComponents = mergingComponents;
+        this.cursor = cursor;
+        this.mergeTarget = mergeTarget;
+        this.callback = callback;
     }
 
     @Override
     public List<IODeviceHandle> getReadDevices() {
-        // TODO Auto-generated method stub
-        return null;
+        List<IODeviceHandle> devs = new ArrayList<IODeviceHandle>();
+        for (Object o : mergingComponents) {
+            AbstractTreeIndex idx = (AbstractTreeIndex) o;
+            devs.add(idx.getFileReference().getDevideHandle());
+        }
+        return devs;
     }
 
     @Override
     public List<IODeviceHandle> getWriteDevices() {
-        // TODO Auto-generated method stub
-        return null;
+        return Collections.singletonList(mergeTarget.getDevideHandle());
     }
 
     @Override
     public void perform() throws HyracksDataException, IndexException {
         ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
-        accessor.merge();
+        accessor.merge(this);
     }
 
     @Override
     public ILSMIOOperationCallback getCallback() {
-        return NoOpIOOperationCallback.INSTANCE;
+        return callback;
     }
 
+    public FileReference getMergeTarget() {
+        return mergeTarget;
+    }
+
+    public ITreeIndexCursor getCursor() {
+        return cursor;
+    }
+
+    public List<Object> getMergingComponents() {
+        return mergingComponents;
+    }
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 1630d2c..8676b27 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -22,6 +22,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 
 public abstract class LSMTreeIndexAccessor implements ILSMIndexAccessor {
@@ -51,7 +53,7 @@
         ctx.reset(IndexOp.DELETE);
         lsmHarness.insertUpdateOrDelete(tuple, ctx);
     }
-    
+
     @Override
     public void upsert(ITupleReference tuple) throws HyracksDataException, IndexException {
         ctx.reset(IndexOp.UPSERT);
@@ -65,18 +67,24 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException, IndexException {
-        lsmHarness.flush();
+    public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+        lsmHarness.flush(operation);
     }
 
     @Override
-    public void merge() throws HyracksDataException, IndexException {
-        lsmHarness.merge();
+    public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+        lsmHarness.merge(operation);
     }
-    
+
     @Override
     public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
         ctx.reset(IndexOp.PHYSICALDELETE);
         lsmHarness.insertUpdateOrDelete(tuple, ctx);
     }
+
+    @Override
+    public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
+            LSMMergeInProgressException {
+        return lsmHarness.createMergeOperation(callback);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
index 12dc42d..cd1d394 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
@@ -1,6 +1,8 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 
 public class RefCountingOperationTracker implements ILSMOperationTracker {
@@ -34,7 +36,10 @@
 
             // Flush will only be handled by last exiting thread.
             if (index.getFlushController().getFlushStatus(index) && threadRefCount == 0) {
-                index.getIOScheduler().scheduleOperation(new LSMFlushOperation(index));
+                ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                        NoOpOperationCallback.INSTANCE);
+                index.getIOScheduler().scheduleOperation(
+                        accessor.createFlushOperation(NoOpIOOperationCallback.INSTANCE));
             }
         }
     }
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index d6ba98f..59e74ba 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -43,6 +43,8 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -183,7 +185,8 @@
     }
 
     @Override
-    public Object flush() throws HyracksDataException, IndexException {
+    public Object flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+        LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation;
         // Renaming order is critical because we use assume ordering when we
         // read the file names when we open the tree.
         // The RTree should be renamed before the BTree.
@@ -194,9 +197,7 @@
         RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor();
         SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
         memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
-        LSMRTreeFileNameComponent fileNames = (LSMRTreeFileNameComponent) fileManager.getRelFlushFileName();
-        FileReference rtreeFile = fileManager.createFlushFile(fileNames.getRTreeFileName());
-        RTree diskRTree = (RTree) createDiskTree(diskRTreeFactory, rtreeFile, true);
+        RTree diskRTree = (RTree) createDiskTree(diskRTreeFactory, flushOp.getRTreeFlushTarget(), true);
         IIndexBulkLoader rTreeBulkloader;
         ITreeIndexCursor cursor;
 
@@ -247,8 +248,7 @@
         IIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor();
         RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
         memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
-        FileReference btreeFile = fileManager.createFlushFile(fileNames.getBTreeFileName());
-        BTree diskBTree = (BTree) createDiskTree(diskBTreeFactory, btreeFile, true);
+        BTree diskBTree = (BTree) createDiskTree(diskBTreeFactory, flushOp.getBTreeFlushTarget(), true);
 
         // BulkLoad the tuples from the in-memory tree into the new disk BTree.
         IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f);
@@ -266,18 +266,11 @@
     }
 
     @Override
-    public Object merge(List<Object> mergedComponents) throws HyracksDataException, IndexException {
-        // Renaming order is critical because we use assume ordering when we
-        // read the file names when we open the tree.
-        // The RTree should be renamed before the BTree.
-
-        IIndexOpContext ctx = createOpContext();
-        ITreeIndexCursor cursor;
-        cursor = new LSMRTreeSortedCursor(linearizer);
-        ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
-        // Scan the RTrees, ignoring the in-memory RTree.
-        List<Object> mergingComponents = lsmHarness.search(cursor, rtreeSearchPred, ctx, false);
-        mergedComponents.addAll(mergingComponents);
+    public Object merge(List<Object> mergedComponents, ILSMIOOperation operation) throws HyracksDataException,
+            IndexException {
+        LSMRTreeMergeOperation mergeOp = (LSMRTreeMergeOperation) operation;
+        ITreeIndexCursor cursor = mergeOp.getCursor();
+        mergedComponents.addAll(mergeOp.getMergingComponents());
 
         // Nothing to merge.
         if (mergedComponents.size() <= 1) {
@@ -286,11 +279,8 @@
         }
 
         // Bulk load the tuples from all on-disk RTrees into the new RTree.
-        LSMRTreeFileNameComponent fileNames = getMergeTargetFileName(mergingComponents);
-        FileReference rtreeFile = fileManager.createMergeFile(fileNames.getRTreeFileName());
-        FileReference btreeFile = fileManager.createMergeFile(fileNames.getBTreeFileName());
-        RTree mergedRTree = (RTree) createDiskTree(diskRTreeFactory, rtreeFile, true);
-        BTree mergedBTree = (BTree) createDiskTree(diskBTreeFactory, btreeFile, true);
+        RTree mergedRTree = (RTree) createDiskTree(diskRTreeFactory, mergeOp.getRTreeMergeTarget(), true);
+        BTree mergedBTree = (BTree) createDiskTree(diskBTreeFactory, mergeOp.getBTreeMergeTarget(), true);
 
         IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f);
         try {
@@ -326,6 +316,35 @@
     }
 
     @Override
+    public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
+        // Renaming order is critical because we use assume ordering when we
+        // read the file names when we open the tree.
+        // The RTree should be renamed before the BTree.
+        IIndexOpContext ctx = createOpContext();
+        ITreeIndexCursor cursor;
+        cursor = new LSMRTreeSortedCursor(linearizer);
+        ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+        // Scan the RTrees, ignoring the in-memory RTree.
+        List<Object> mergingComponents;
+        try {
+            mergingComponents = lsmHarness.search(cursor, rtreeSearchPred, ctx, false);
+        } catch (IndexException e) {
+            throw new HyracksDataException(e);
+        }
+        // Nothing to merge.
+        if (mergingComponents.size() <= 1) {
+            cursor.close();
+            return null;
+        }
+        LSMRTreeFileNameComponent fileNames = getMergeTargetFileName(mergingComponents);
+        FileReference rtreeFile = fileManager.createMergeFile(fileNames.getRTreeFileName());
+        FileReference btreeFile = fileManager.createMergeFile(fileNames.getBTreeFileName());
+
+        return new LSMRTreeMergeOperation(lsmHarness.getIndex(), mergingComponents, cursor, rtreeFile, btreeFile,
+                callback);
+    }
+
+    @Override
     public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
             ISearchOperationCallback searchCallback) {
         return new LSMRTreeAccessor(lsmHarness, createOpContext());
@@ -345,6 +364,14 @@
             LSMRTreeOpContext concreteCtx = (LSMRTreeOpContext) ctx;
             return concreteCtx.rtreeOpContext.cmp;
         }
+
+        @Override
+        public ILSMIOOperation createFlushOperation(ILSMIOOperationCallback callback) {
+            LSMRTreeFileNameComponent fileNames = (LSMRTreeFileNameComponent) fileManager.getRelFlushFileName();
+            FileReference rtreeFile = fileManager.createFlushFile(fileNames.getRTreeFileName());
+            FileReference btreeFile = fileManager.createFlushFile(fileNames.getBTreeFileName());
+            return new LSMRTreeFlushOperation(lsmHarness.getIndex(), rtreeFile, btreeFile, callback);
+        }
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
new file mode 100644
index 0000000..5ec4776
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -0,0 +1,60 @@
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import java.util.Collections;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public class LSMRTreeFlushOperation implements ILSMIOOperation {
+
+    private final ILSMIndex index;
+    private final FileReference rtreeFlushTarget;
+    private final FileReference btreeFlushTarget;
+    private final ILSMIOOperationCallback callback;
+
+    public LSMRTreeFlushOperation(ILSMIndex index, FileReference rtreeFlushTarget, FileReference btreeFlushTarget,
+            ILSMIOOperationCallback callback) {
+        this.index = index;
+        this.rtreeFlushTarget = rtreeFlushTarget;
+        this.btreeFlushTarget = btreeFlushTarget;
+        this.callback = callback;
+    }
+
+    @Override
+    public List<IODeviceHandle> getReadDevices() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<IODeviceHandle> getWriteDevices() {
+        return Collections.singletonList(rtreeFlushTarget.getDevideHandle());
+    }
+
+    @Override
+    public void perform() throws HyracksDataException, IndexException {
+        ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                NoOpOperationCallback.INSTANCE);
+        accessor.flush(this);
+    }
+
+    @Override
+    public ILSMIOOperationCallback getCallback() {
+        return callback;
+    }
+
+    public FileReference getRTreeFlushTarget() {
+        return rtreeFlushTarget;
+    }
+
+    public FileReference getBTreeFlushTarget() {
+        return btreeFlushTarget;
+    }
+}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
new file mode 100644
index 0000000..e55f2c1
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -0,0 +1,80 @@
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree.LSMRTreeComponent;
+
+public class LSMRTreeMergeOperation implements ILSMIOOperation {
+    private final ILSMIndex index;
+    private final List<Object> mergingComponents;
+    private final ITreeIndexCursor cursor;
+    private final FileReference rtreeMergeTarget;
+    private final FileReference btreeMergeTarget;
+    private final ILSMIOOperationCallback callback;
+
+    public LSMRTreeMergeOperation(ILSMIndex index, List<Object> mergingComponents, ITreeIndexCursor cursor,
+            FileReference rtreeMergeTarget, FileReference btreeMergeTarget, ILSMIOOperationCallback callback) {
+        this.index = index;
+        this.mergingComponents = mergingComponents;
+        this.cursor = cursor;
+        this.rtreeMergeTarget = rtreeMergeTarget;
+        this.btreeMergeTarget = btreeMergeTarget;
+        this.callback = callback;
+    }
+
+    @Override
+    public List<IODeviceHandle> getReadDevices() {
+        List<IODeviceHandle> devs = new ArrayList<IODeviceHandle>();
+        for (Object o : mergingComponents) {
+            LSMRTreeComponent component = (LSMRTreeComponent) o;
+            devs.add(component.getRTree().getFileReference().getDevideHandle());
+        }
+        return devs;
+    }
+
+    @Override
+    public List<IODeviceHandle> getWriteDevices() {
+        return Collections.singletonList(rtreeMergeTarget.getDevideHandle());
+    }
+
+    @Override
+    public void perform() throws HyracksDataException, IndexException {
+        ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                NoOpOperationCallback.INSTANCE);
+        accessor.merge(this);
+    }
+
+    @Override
+    public ILSMIOOperationCallback getCallback() {
+        return callback;
+    }
+
+    public FileReference getRTreeMergeTarget() {
+        return rtreeMergeTarget;
+    }
+
+    public FileReference getBTreeMergeTarget() {
+        return btreeMergeTarget;
+    }
+
+    public ITreeIndexCursor getCursor() {
+        return cursor;
+    }
+
+    public List<Object> getMergingComponents() {
+        return mergingComponents;
+    }
+
+}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 9e1bdb3..2f94339 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -44,11 +44,15 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexComponentFinalizer;
@@ -134,23 +138,6 @@
         super.clear();
     }
 
-    private RTree createFlushTarget() throws HyracksDataException {
-        String relFlushFileName = (String) fileManager.getRelFlushFileName();
-        FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
-        return (RTree) createDiskTree(diskRTreeFactory, fileRef, true);
-    }
-
-    private RTree createMergeTarget(List<Object> mergingDiskRTrees) throws HyracksDataException {
-        RTree firstRTree = (RTree) mergingDiskRTrees.get(0);
-        RTree lastRTree = (RTree) mergingDiskRTrees.get(mergingDiskRTrees.size() - 1);
-        FileReference firstFile = diskFileMapProvider.lookupFileName(firstRTree.getFileId());
-        FileReference lastFile = diskFileMapProvider.lookupFileName(lastRTree.getFileId());
-        String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile
-                .getFile().getName());
-        FileReference fileRef = fileManager.createMergeFile(relMergeFileName);
-        return (RTree) createDiskTree(diskRTreeFactory, fileRef, true);
-    }
-
     public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
             boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException {
         LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
@@ -191,7 +178,8 @@
     }
 
     @Override
-    public ITreeIndex flush() throws HyracksDataException, IndexException {
+    public ITreeIndex flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+        LSMFlushOperation flushOp = (LSMFlushOperation) operation;
         // Renaming order is critical because we use assume ordering when we
         // read the file names when we open the tree.
         // The RTree should be renamed before the BTree.
@@ -202,7 +190,7 @@
         RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor();
         SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
         memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
-        RTree diskRTree = createFlushTarget();
+        RTree diskRTree = (RTree) createDiskTree(diskRTreeFactory, flushOp.getFlushTarget(), true);
 
         // scan the memory BTree
         ITreeIndexAccessor memBTreeAccessor = memComponent.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
@@ -282,15 +270,11 @@
     }
 
     @Override
-    public ITreeIndex merge(List<Object> mergedComponents) throws HyracksDataException, IndexException {
-        LSMRTreeOpContext ctx = createOpContext();
-        ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor();
-        ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
-        // Ordered scan, ignoring the in-memory RTree.
-        // We get back a snapshot of the on-disk RTrees that are going to be
-        // merged now, so we can clean them up after the merge has completed.
-        List<Object> mergingDiskRTrees = lsmHarness.search(cursor, (SearchPredicate) rtreeSearchPred, ctx, false);
-        mergedComponents.addAll(mergingDiskRTrees);
+    public ITreeIndex merge(List<Object> mergedComponents, ILSMIOOperation operation) throws HyracksDataException,
+            IndexException {
+        LSMMergeOperation mergeOp = (LSMMergeOperation) operation;
+        ITreeIndexCursor cursor = mergeOp.getCursor();
+        mergedComponents.addAll(mergeOp.getMergingComponents());
 
         // Nothing to merge.
         if (mergedComponents.size() <= 1) {
@@ -299,7 +283,7 @@
         }
 
         // Bulk load the tuples from all on-disk RTrees into the new RTree.
-        RTree mergedRTree = createMergeTarget(mergingDiskRTrees);
+        RTree mergedRTree = (RTree) createDiskTree(diskRTreeFactory, mergeOp.getMergeTarget(), true);
         IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f);
         try {
             while (cursor.hasNext()) {
@@ -344,6 +328,13 @@
             LSMRTreeOpContext concreteCtx = (LSMRTreeOpContext) ctx;
             return concreteCtx.rtreeOpContext.cmp;
         }
+
+        @Override
+        public ILSMIOOperation createFlushOperation(ILSMIOOperationCallback callback) {
+            String relFlushFileName = (String) fileManager.getRelFlushFileName();
+            FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
+            return new LSMFlushOperation(lsmHarness.getIndex(), fileRef, callback);
+        }
     }
 
     @Override
@@ -384,4 +375,33 @@
         }
 
     }
+
+    @Override
+    public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
+        LSMRTreeOpContext ctx = createOpContext();
+        ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor();
+        ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+        // Ordered scan, ignoring the in-memory RTree.
+        // We get back a snapshot of the on-disk RTrees that are going to be
+        // merged now, so we can clean them up after the merge has completed.
+        List<Object> mergingDiskRTrees;
+        try {
+            mergingDiskRTrees = lsmHarness.search(cursor, (SearchPredicate) rtreeSearchPred, ctx, false);
+            if (mergingDiskRTrees.size() <= 1) {
+                cursor.close();
+                return null;
+            }
+        } catch (IndexException e) {
+            throw new HyracksDataException(e);
+        }
+
+        RTree firstRTree = (RTree) mergingDiskRTrees.get(0);
+        RTree lastRTree = (RTree) mergingDiskRTrees.get(mergingDiskRTrees.size() - 1);
+        FileReference firstFile = diskFileMapProvider.lookupFileName(firstRTree.getFileId());
+        FileReference lastFile = diskFileMapProvider.lookupFileName(lastRTree.getFileId());
+        String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile
+                .getFile().getName());
+        FileReference fileRef = fileManager.createMergeFile(relMergeFileName);
+        return new LSMMergeOperation(lsmHarness.getIndex(), mergingDiskRTrees, cursor, fileRef, callback);
+    }
 }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index 4d471b8..3add02e 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -24,7 +24,9 @@
 import edu.uci.ics.hyracks.storage.am.btree.OrderedIndexTestUtils;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
 import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 
 @SuppressWarnings("rawtypes")
 public abstract class LSMBTreeMergeTestDriver extends OrderedIndexTestDriver {
@@ -63,7 +65,10 @@
             }
 
             ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
-            accessor.merge();
+            ILSMIOOperation ioop = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+            if (ioop != null) {
+                accessor.merge(ioop);
+            }
 
             orderedIndexTestUtils.checkPointSearches(ctx);
             orderedIndexTestUtils.checkScan(ctx);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 0cbfb64..377b959 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -33,29 +33,31 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree.LSMBTreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeInProgressException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 
 public class LSMBTreeTestWorker extends AbstractTreeIndexTestWorker {
-    
+
     private final LSMBTree lsmBTree;
     private final int numKeyFields;
     private final ArrayTupleBuilder deleteTb;
     private final ArrayTupleReference deleteTuple = new ArrayTupleReference();
-    
+
     public LSMBTreeTestWorker(DataGenThread dataGen, TestOperationSelector opSelector, ITreeIndex index, int numBatches) {
         super(dataGen, opSelector, index, numBatches);
         lsmBTree = (LSMBTree) index;
         numKeyFields = lsmBTree.getComparatorFactories().length;
         deleteTb = new ArrayTupleBuilder(numKeyFields);
     }
-    
+
     @Override
-    public void performOp(ITupleReference tuple, TestOperation op) throws HyracksDataException, IndexException {        
+    public void performOp(ITupleReference tuple, TestOperation op) throws HyracksDataException, IndexException {
         LSMBTreeIndexAccessor accessor = (LSMBTreeIndexAccessor) indexAccessor;
         IIndexCursor searchCursor = accessor.createSearchCursor();
         MultiComparator cmp = accessor.getMultiComparator();
         RangePredicate rangePred = new RangePredicate(tuple, tuple, true, true, cmp, cmp);
-        
+
         switch (op) {
             case INSERT:
                 try {
@@ -64,7 +66,7 @@
                     // Ignore duplicate keys, since we get random tuples.
                 }
                 break;
-                
+
             case DELETE:
                 // Create a tuple reference with only key fields.
                 deleteTb.reset();
@@ -78,8 +80,8 @@
                     // Ignore non-existant keys, since we get random tuples.
                 }
                 break;
-                
-            case UPDATE: 
+
+            case UPDATE:
                 try {
                     accessor.update(tuple);
                 } catch (BTreeNonExistentKeyException e) {
@@ -88,15 +90,15 @@
                     // Ignore not updateable exception due to numKeys == numFields.
                 }
                 break;
-                
-            case POINT_SEARCH: 
+
+            case POINT_SEARCH:
                 searchCursor.reset();
                 rangePred.setLowKey(tuple, true);
                 rangePred.setHighKey(tuple, true);
                 accessor.search(searchCursor, rangePred);
                 consumeCursorTuples(searchCursor);
                 break;
-                
+
             case SCAN:
                 searchCursor.reset();
                 rangePred.setLowKey(null, true);
@@ -104,10 +106,13 @@
                 accessor.search(searchCursor, rangePred);
                 consumeCursorTuples(searchCursor);
                 break;
-                
+
             case MERGE:
                 try {
-                    accessor.merge();
+                    ILSMIOOperation ioop = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+                    if (ioop != null) {
+                        accessor.merge(ioop);
+                    }
                 } catch (LSMMergeInProgressException e) {
                     // Ignore ongoing merges. Do an insert instead.
                     try {
@@ -117,7 +122,7 @@
                     }
                 }
                 break;
-                
+
             default:
                 throw new HyracksDataException("Op " + op.toString() + " not supported.");
         }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
index da66966..36c3c5c 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
@@ -21,7 +21,9 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
 import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestDriver;
 import edu.uci.ics.hyracks.storage.am.rtree.RTreeTestUtils;
@@ -33,7 +35,7 @@
     private final RTreeTestUtils rTreeTestUtils;
 
     public LSMRTreeMergeTestDriver(boolean testRstarPolicy) {
-    	super(testRstarPolicy);
+        super(testRstarPolicy);
         this.rTreeTestUtils = new RTreeTestUtils();
     }
 
@@ -65,7 +67,10 @@
             }
 
             ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
-            accessor.merge();
+            ILSMIOOperation mergeOperation = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+            if (mergeOperation != null) {
+                accessor.merge(mergeOperation);
+            }
 
             rTreeTestUtils.checkScan(ctx);
             rTreeTestUtils.checkDiskOrderScan(ctx);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
index bcbcf71..59a7728 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
@@ -27,7 +27,9 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeInProgressException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree.LSMRTreeAccessor;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
@@ -73,7 +75,10 @@
 
             case MERGE:
                 try {
-                    accessor.merge();
+                    ILSMIOOperation mergeOperation = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+                    if (mergeOperation != null) {
+                        accessor.merge(mergeOperation);
+                    }
                 } catch (LSMMergeInProgressException e) {
                     // Ignore ongoing merges. Do an insert instead.
                     rearrangeTuple(tuple, cmp);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
index 8c5613a..0f8cdec 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
@@ -24,7 +24,9 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeInProgressException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuples.LSMRTreeWithAntiMatterTuplesAccessor;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
 
@@ -62,7 +64,10 @@
 
             case MERGE:
                 try {
-                    accessor.merge();
+                    ILSMIOOperation mergeOperation = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+                    if (mergeOperation != null) {
+                        accessor.merge(mergeOperation);
+                    }
                 } catch (LSMMergeInProgressException e) {
                     // Ignore ongoing merges. Do an insert instead.
                     rearrangeTuple(tuple, cmp);