Add LSMDiskComponentBulkLoader

-Added LSMDiskComponentBulkLoader implementations, which are used to bulk
load an LSMDiskComponent with anti-matters
-Added LSMDiskComponentWithBuddyBTreeBulkLoader implementations,
which are used to bulk load an LSMDiskComponent with deleted-keys btrees
-Refactored LSM flush/merge/index bulk load operations to use
the LSMDiskComponentBulkLoader

Change-Id: I772a6d68761fcbb85982a1c9f72f2d186e1d1ffb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1773
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index 1d8de79..c641dc1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -28,12 +28,9 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
 import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
@@ -451,14 +448,11 @@
     // modifications
     public class LSMTwoPCBTreeBulkLoader implements IIndexBulkLoader, ITwoPCIndexBulkLoader {
         private final ILSMDiskComponent component;
-        private final BTreeBulkLoader bulkLoader;
-        private final IIndexBulkLoader builder;
-        private boolean cleanedUpArtifacts = false;
-        private boolean isEmptyComponent = true;
-        private boolean endedBloomFilterLoad = false;
-        private final boolean isTransaction;
+        private final IIndexBulkLoader componentBulkLoader;
         private final ITreeIndexTupleWriterFactory frameTupleWriterFactory;
 
+        private final boolean isTransaction;
+
         public LSMTwoPCBTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
                 boolean isTransaction) throws HyracksDataException {
             this.isTransaction = isTransaction;
@@ -471,68 +465,23 @@
 
             frameTupleWriterFactory =
                     ((LSMBTreeDiskComponent) component).getBTree().getLeafFrameFactory().getTupleWriterFactory();
-            bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
-                    verifyInput, numElementsHint, false);
 
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
-            BloomFilterSpecification bloomFilterSpec =
-                    BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-            builder = ((LSMBTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
-                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+            componentBulkLoader =
+                    createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
         }
 
         // It is expected that the mode was set to insert operation before
         // calling add
         @Override
         public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                bulkLoader.add(tuple);
-                builder.add(tuple);
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
-        }
-
-        // This is made public in case of a failure, it is better to delete all
-        // created artifacts.
-        public void cleanupArtifacts() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                cleanedUpArtifacts = true;
-                // We make sure to end the bloom filter load to release latches.
-                if (!endedBloomFilterLoad) {
-                    builder.end();
-                    endedBloomFilterLoad = true;
-                }
-                try {
-                    ((LSMBTreeDiskComponent) component).getBTree().deactivate();
-                } catch (HyracksDataException e) {
-                    // Do nothing.. this could've bee
-                }
-                ((LSMBTreeDiskComponent) component).getBTree().destroy();
-                try {
-                    ((LSMBTreeDiskComponent) component).getBloomFilter().deactivate();
-                } catch (HyracksDataException e) {
-                    // Do nothing.. this could've bee
-                }
-                ((LSMBTreeDiskComponent) component).getBloomFilter().destroy();
-            }
+            componentBulkLoader.add(tuple);
         }
 
         @Override
         public void end() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                if (!endedBloomFilterLoad) {
-                    builder.end();
-                    endedBloomFilterLoad = true;
-                }
-                bulkLoader.end();
-                if (isEmptyComponent) {
-                    cleanupArtifacts();
-                } else if (isTransaction) {
+            componentBulkLoader.end();
+            if (component.getComponentSize() > 0) {
+                if (isTransaction) {
                     // Since this is a transaction component, validate and
                     // deactivate. it could later be added or deleted
                     markAsValid(component);
@@ -551,23 +500,14 @@
         @Override
         public void delete(ITupleReference tuple) throws HyracksDataException {
             ((LSMBTreeRefrencingTupleWriterFactory) frameTupleWriterFactory).setMode(IndexOperation.DELETE);
-            try {
-                bulkLoader.add(tuple);
-                builder.add(tuple);
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
+            componentBulkLoader.add(tuple);
             ((LSMBTreeRefrencingTupleWriterFactory) frameTupleWriterFactory).setMode(IndexOperation.INSERT);
         }
 
         @Override
         public void abort() {
             try {
-                cleanupArtifacts();
+                componentBulkLoader.abort();
             } catch (Exception e) {
                 // Do nothing
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index fed4588..dfa08d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -35,7 +35,6 @@
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -70,6 +69,7 @@
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 
@@ -274,6 +274,26 @@
         newerList.add(swapIndex, newComponent);
     }
 
+    @Override
+    public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+            throws HyracksDataException {
+        BloomFilterSpecification bloomFilterSpec = null;
+        if (numElementsHint > 0) {
+            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+        }
+        if (withFilter && filterFields != null) {
+            return new LSMBTreeWithBuddyDiskComponentBulkLoader((LSMBTreeWithBuddyDiskComponent) component,
+                    bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                    treeFields, filterFields,
+                    MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+        } else {
+            return new LSMBTreeWithBuddyDiskComponentBulkLoader((LSMBTreeWithBuddyDiskComponent) component,
+                    bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+        }
+    }
+
     // For initial load
     @Override
     public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
@@ -372,6 +392,8 @@
         LSMBTreeWithBuddyDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(),
                 mergeOp.getBuddyBTreeTarget(), mergeOp.getBloomFilterTarget(), true);
 
+        IIndexBulkLoader componentBulkLoader;
+
         // In case we must keep the deleted-keys BuddyBTrees, then they must be
         // merged *before* merging the b-trees so that
         // lsmHarness.endSearch() is called once when the b-trees have been
@@ -383,46 +405,37 @@
             LSMBuddyBTreeMergeCursor buddyBtreeCursor = new LSMBuddyBTreeMergeCursor(opCtx);
             search(opCtx, buddyBtreeCursor, btreeSearchPred);
 
-            BTree buddyBtree = mergedComponent.getBuddyBTree();
-            IIndexBulkLoader buddyBtreeBulkLoader = buddyBtree.createBulkLoader(1.0f, true, 0L, false);
-
             long numElements = 0L;
             for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
                 numElements += ((LSMBTreeWithBuddyDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter()
                         .getNumElements();
             }
 
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
-            BloomFilterSpecification bloomFilterSpec =
-                    BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-            IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
-                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+            componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, numElements, false, false);
 
             try {
                 while (buddyBtreeCursor.hasNext()) {
                     buddyBtreeCursor.next();
                     ITupleReference tuple = buddyBtreeCursor.getTuple();
-                    buddyBtreeBulkLoader.add(tuple);
-                    builder.add(tuple);
+                    ((LSMBTreeWithBuddyDiskComponentBulkLoader) componentBulkLoader).delete(tuple);
                 }
             } finally {
                 buddyBtreeCursor.close();
-                builder.end();
             }
-            buddyBtreeBulkLoader.end();
+        } else {
+            componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, 0L, false, false);
         }
 
-        IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, 0L, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
                 ITupleReference frameTuple = cursor.getTuple();
-                bulkLoader.add(frameTuple);
+                componentBulkLoader.add(frameTuple);
             }
         } finally {
             cursor.close();
         }
-        bulkLoader.end();
+        componentBulkLoader.end();
         return mergedComponent;
     }
 
@@ -589,12 +602,7 @@
     // modifications
     public class LSMTwoPCBTreeWithBuddyBulkLoader implements IIndexBulkLoader, ITwoPCIndexBulkLoader {
         private final ILSMDiskComponent component;
-        private final BTreeBulkLoader btreeBulkLoader;
-        private final BTreeBulkLoader buddyBtreeBulkLoader;
-        private final IIndexBulkLoader builder;
-        private boolean cleanedUpArtifacts = false;
-        private boolean isEmptyComponent = true;
-        private boolean endedBloomFilterLoad = false;
+        private final IIndexBulkLoader componentBulkLoader;
         private final boolean isTransaction;
 
         public LSMTwoPCBTreeWithBuddyBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
@@ -607,69 +615,20 @@
                 component = createBulkLoadTarget();
             }
 
-            // Create the three loaders
-            btreeBulkLoader = (BTreeBulkLoader) ((LSMBTreeWithBuddyDiskComponent) component).getBTree()
-                    .createBulkLoader(fillFactor, verifyInput, numElementsHint, false);
-            buddyBtreeBulkLoader = (BTreeBulkLoader) ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree()
-                    .createBulkLoader(fillFactor, verifyInput, numElementsHint, false);
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
-            BloomFilterSpecification bloomFilterSpec =
-                    BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-            builder = ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
-                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+            componentBulkLoader =
+                    createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
         }
 
         @Override
         public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                btreeBulkLoader.add(tuple);
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
-        }
-
-        // This is made public in case of a failure, it is better to delete all
-        // created artifacts.
-        public void cleanupArtifacts() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                cleanedUpArtifacts = true;
-                try {
-                    ((LSMBTreeWithBuddyDiskComponent) component).getBTree().deactivate();
-                } catch (Exception e) {
-
-                }
-                ((LSMBTreeWithBuddyDiskComponent) component).getBTree().destroy();
-                try {
-                    ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree().deactivate();
-                } catch (Exception e) {
-
-                }
-                ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree().destroy();
-                try {
-                    ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter().deactivate();
-                } catch (Exception e) {
-
-                }
-                ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter().destroy();
-            }
+            componentBulkLoader.add(tuple);
         }
 
         @Override
         public void end() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                if (!endedBloomFilterLoad) {
-                    builder.end();
-                    endedBloomFilterLoad = true;
-                }
-                btreeBulkLoader.end();
-                buddyBtreeBulkLoader.end();
-                if (isEmptyComponent) {
-                    cleanupArtifacts();
-                } else if (isTransaction) {
+            componentBulkLoader.end();
+            if (component.getComponentSize() > 0) {
+                if (isTransaction) {
                     // Since this is a transaction component, validate and
                     // deactivate. it could later be added or deleted
                     markAsValid(component);
@@ -687,22 +646,13 @@
 
         @Override
         public void delete(ITupleReference tuple) throws HyracksDataException {
-            try {
-                buddyBtreeBulkLoader.add(tuple);
-                builder.add(tuple);
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
+            ((LSMBTreeWithBuddyDiskComponentBulkLoader) componentBulkLoader).delete(tuple);
         }
 
         @Override
         public void abort() {
             try {
-                cleanupArtifacts();
+                componentBulkLoader.abort();
             } catch (Exception e) {
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 4cd4dc6..a4c67c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -73,6 +73,7 @@
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 
@@ -292,12 +293,10 @@
 
         RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
         long numElements = 0L;
-        BloomFilterSpecification bloomFilterSpec = null;
         if (hasBloomFilter) {
             //count elements in btree for creating Bloomfilter
             IIndexCursor countingCursor = ((BTreeAccessor) accessor).createCountingSearchCursor();
             accessor.search(countingCursor, nullPred);
-
             try {
                 while (countingCursor.hasNext()) {
                     countingCursor.next();
@@ -307,35 +306,23 @@
             } finally {
                 countingCursor.close();
             }
-
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
-            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
         }
 
         LSMBTreeDiskComponent component =
                 createDiskComponent(componentFactory, flushOp.getTarget(), flushOp.getBloomFilterTarget(), true);
-        IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false, numElements, false);
-        IIndexBulkLoader builder = null;
-        if (hasBloomFilter) {
-            builder = component.getBloomFilter().createBuilder(numElements, bloomFilterSpec.getNumHashes(),
-                    bloomFilterSpec.getNumBucketsPerElements());
-        }
+
+        IIndexBulkLoader componentBulkLoader =
+                createComponentBulkLoader(component, 1.0f, false, numElements, false, false);
 
         IIndexCursor scanCursor = accessor.createSearchCursor(false);
         accessor.search(scanCursor, nullPred);
         try {
             while (scanCursor.hasNext()) {
                 scanCursor.next();
-                if (hasBloomFilter) {
-                    builder.add(scanCursor.getTuple());
-                }
-                bulkLoader.add(scanCursor.getTuple());
+                componentBulkLoader.add(scanCursor.getTuple());
             }
         } finally {
             scanCursor.close();
-            if (hasBloomFilter) {
-                builder.end();
-            }
         }
 
         if (component.getLSMComponentFilter() != null) {
@@ -353,7 +340,8 @@
         // TODO This code should be in the callback and not in the index
         flushingComponent.getMetadata().copy(component.getMetadata());
 
-        bulkLoader.end();
+        componentBulkLoader.end();
+
         return component;
     }
 
@@ -368,38 +356,25 @@
         List<ILSMComponent> mergedComponents = mergeOp.getMergingComponents();
 
         long numElements = 0L;
-        BloomFilterSpecification bloomFilterSpec = null;
         if (hasBloomFilter) {
             //count elements in btree for creating Bloomfilter
             for (int i = 0; i < mergedComponents.size(); ++i) {
                 numElements += ((LSMBTreeDiskComponent) mergedComponents.get(i)).getBloomFilter().getNumElements();
             }
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
-            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
         }
         LSMBTreeDiskComponent mergedComponent =
                 createDiskComponent(componentFactory, mergeOp.getTarget(), mergeOp.getBloomFilterTarget(), true);
 
-        IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false);
-        IIndexBulkLoader builder = null;
-        if (hasBloomFilter) {
-            builder = mergedComponent.getBloomFilter().createBuilder(numElements, bloomFilterSpec.getNumHashes(),
-                    bloomFilterSpec.getNumBucketsPerElements());
-        }
+        IIndexBulkLoader componentBulkLoader =
+                createComponentBulkLoader(mergedComponent, 1.0f, false, numElements, false, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
                 ITupleReference frameTuple = cursor.getTuple();
-                if (hasBloomFilter) {
-                    builder.add(frameTuple);
-                }
-                bulkLoader.add(frameTuple);
+                componentBulkLoader.add(frameTuple);
             }
         } finally {
             cursor.close();
-            if (hasBloomFilter) {
-                builder.end();
-            }
         }
         if (mergedComponent.getLSMComponentFilter() != null) {
             List<ITupleReference> filterTuples = new ArrayList<>();
@@ -410,7 +385,9 @@
             getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
             getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getBTree());
         }
-        bulkLoader.end();
+
+        componentBulkLoader.end();
+
         return mergedComponent;
     }
 
@@ -438,6 +415,27 @@
     }
 
     @Override
+    public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+            throws HyracksDataException {
+        BloomFilterSpecification bloomFilterSpec = null;
+        if (hasBloomFilter) {
+            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+        }
+
+        if (withFilter && filterFields != null) {
+            return new LSMBTreeDiskComponentBulkLoader((LSMBTreeDiskComponent) component, bloomFilterSpec, fillFactor,
+                    verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields, filterFields,
+                    MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+        } else {
+            return new LSMBTreeDiskComponentBulkLoader((LSMBTreeDiskComponent) component, bloomFilterSpec, fillFactor,
+                    verifyInput, numElementsHint, checkIfEmptyIndex);
+        }
+
+    }
+
+    @Override
     public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
             throws HyracksDataException {
         return new LSMBTreeBulkLoader(this, fillLevel, verifyInput, numElementsHint);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java
index 247b3de..239eba2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java
@@ -20,135 +20,43 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.MultiComparator;
 
 public class LSMBTreeBulkLoader implements IIndexBulkLoader {
     private final LSMBTree lsmIndex;
     private final ILSMDiskComponent component;
-    private final BTreeBulkLoader bulkLoader;
-    private final IIndexBulkLoader builder;
-    private boolean cleanedUpArtifacts = false;
-    private boolean isEmptyComponent = true;
-    private boolean endedBloomFilterLoad = false;
-    public final PermutingTupleReference indexTuple;
-    public final PermutingTupleReference filterTuple;
-    public final MultiComparator filterCmp;
+    private final IIndexBulkLoader componentBulkLoader;
 
     public LSMBTreeBulkLoader(LSMBTree lsmIndex, float fillFactor, boolean verifyInput, long numElementsHint)
             throws HyracksDataException {
         this.lsmIndex = lsmIndex;
-        component = lsmIndex.createBulkLoadTarget();
-        bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
-                verifyInput, numElementsHint, false);
-
-        if (lsmIndex.hasBloomFilter()) {
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
-            BloomFilterSpecification bloomFilterSpec =
-                    BloomCalculations.computeBloomSpec(maxBucketsPerElement, lsmIndex.bloomFilterFalsePositiveRate());
-            builder = ((LSMBTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
-                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-        } else {
-            builder = null;
-        }
-
-        if (lsmIndex.getFilterFields() != null) {
-            indexTuple = new PermutingTupleReference(lsmIndex.getTreeFields());
-            filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
-            filterTuple = new PermutingTupleReference(lsmIndex.getFilterFields());
-        } else {
-            indexTuple = null;
-            filterCmp = null;
-            filterTuple = null;
-        }
+        this.component = lsmIndex.createBulkLoadTarget();
+        this.componentBulkLoader =
+                lsmIndex.createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
     }
 
     @Override
     public void add(ITupleReference tuple) throws HyracksDataException {
-        try {
-            ITupleReference t;
-            if (indexTuple != null) {
-                indexTuple.reset(tuple);
-                t = indexTuple;
-            } else {
-                t = tuple;
-            }
-
-            bulkLoader.add(t);
-            if (lsmIndex.hasBloomFilter()) {
-                builder.add(t);
-            }
-
-            if (filterTuple != null) {
-                filterTuple.reset(tuple);
-                component.getLSMComponentFilter().update(filterTuple, filterCmp);
-            }
-        } catch (Exception e) {
-            cleanupArtifacts();
-            throw e;
-        }
-        if (isEmptyComponent) {
-            isEmptyComponent = false;
-        }
-    }
-
-    private void cleanupArtifacts() throws HyracksDataException {
-        if (!cleanedUpArtifacts) {
-            cleanedUpArtifacts = true;
-            if (lsmIndex.hasBloomFilter() && !endedBloomFilterLoad) {
-                builder.abort();
-                endedBloomFilterLoad = true;
-            }
-            ((LSMBTreeDiskComponent) component).getBTree().deactivate();
-            ((LSMBTreeDiskComponent) component).getBTree().destroy();
-            if (lsmIndex.hasBloomFilter()) {
-                ((LSMBTreeDiskComponent) component).getBloomFilter().deactivate();
-                ((LSMBTreeDiskComponent) component).getBloomFilter().destroy();
-            }
-        }
+        componentBulkLoader.add(tuple);
     }
 
     @Override
     public void end() throws HyracksDataException {
-        if (!cleanedUpArtifacts) {
-            if (lsmIndex.hasBloomFilter() && !endedBloomFilterLoad) {
-                builder.end();
-                endedBloomFilterLoad = true;
-            }
-
-            if (component.getLSMComponentFilter() != null) {
-                lsmIndex.getFilterManager().writeFilter(component.getLSMComponentFilter(),
-                        ((LSMBTreeDiskComponent) component).getBTree());
-            }
-            bulkLoader.end();
-
-            if (isEmptyComponent) {
-                cleanupArtifacts();
-            } else {
-                //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
-                //then after operation should be called from harness as well
-                //https://issues.apache.org/jira/browse/ASTERIXDB-1764
-                lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component);
-                lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
-            }
+        componentBulkLoader.end();
+        if (component.getComponentSize() > 0) {
+            //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
+            //then after operation should be called from harness as well
+            //https://issues.apache.org/jira/browse/ASTERIXDB-1764
+            lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component);
+            lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
         }
     }
 
     @Override
     public void abort() throws HyracksDataException {
-        if (bulkLoader != null) {
-            bulkLoader.abort();
-        }
-
-        if (builder != null) {
-            builder.abort();
-        }
-
+        componentBulkLoader.end();
     }
+
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java
new file mode 100644
index 0000000..a5720de
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMBTreeDiskComponentBulkLoader extends AbstractLSMDiskComponentBulkLoader {
+
+    //with filter
+    public LSMBTreeDiskComponentBulkLoader(LSMBTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+            ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp)
+            throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                indexFields, filterFields, filterCmp);
+    }
+
+    //without filter
+    public LSMBTreeDiskComponentBulkLoader(LSMBTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex)
+            throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+                null);
+    }
+
+    @Override
+    protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+        return ((LSMBTreeDiskComponent) component).getBloomFilter();
+    }
+
+    @Override
+    protected IIndex getIndex(ILSMDiskComponent component) {
+        return ((LSMBTreeDiskComponent) component).getBTree();
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponentBulkLoader.java
new file mode 100644
index 0000000..e4ab6d4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponentBulkLoader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMBTreeWithBuddyDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBulkLoader {
+
+    //with filter
+    public LSMBTreeWithBuddyDiskComponentBulkLoader(LSMBTreeWithBuddyDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+            MultiComparator filterCmp) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                indexFields, filterFields, filterCmp);
+    }
+
+    //without filter
+    public LSMBTreeWithBuddyDiskComponentBulkLoader(LSMBTreeWithBuddyDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+                null);
+    }
+
+    @Override
+    protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+        return ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter();
+    }
+
+    @Override
+    protected IIndex getIndex(ILSMDiskComponent component) {
+        return ((LSMBTreeWithBuddyDiskComponent) component).getBTree();
+    }
+
+    @Override
+    protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) {
+        return ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree();
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 2a9186b..5a6f391 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMHarness;
 import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
@@ -139,4 +140,20 @@
      * @throws HyracksDataException
      */
     void updateFilter(ILSMIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException;
+
+    /**
+     * Create a component bulk loader for the given component
+     *
+     * @param component
+     * @param fillFactor
+     * @param verifyInput
+     * @param numElementsHint
+     * @param checkIfEmptyIndex
+     * @param withFilter
+     * @return
+     * @throws HyracksDataException
+     */
+    IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter) throws HyracksDataException;
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
new file mode 100644
index 0000000..964893a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public abstract class AbstractLSMDiskComponentBulkLoader implements IIndexBulkLoader {
+    protected final ILSMDiskComponent component;
+
+    protected final IIndexBulkLoader indexBulkLoader;
+    protected final IIndexBulkLoader bloomFilterBuilder;
+
+    protected final ILSMComponentFilterManager filterManager;
+    protected final PermutingTupleReference indexTuple;
+    protected final PermutingTupleReference filterTuple;
+    protected final MultiComparator filterCmp;
+
+    protected boolean cleanedUpArtifacts = false;
+    protected boolean isEmptyComponent = true;
+    protected boolean endedBloomFilterLoad = false;
+
+    //with filter
+    public AbstractLSMDiskComponentBulkLoader(ILSMDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+            ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp)
+            throws HyracksDataException {
+        this.component = component;
+        this.indexBulkLoader =
+                getIndex(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+        if (bloomFilterSpec != null) {
+            this.bloomFilterBuilder = getBloomFilter(component).createBuilder(numElementsHint,
+                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+        } else {
+            this.bloomFilterBuilder = null;
+        }
+        if (filterManager != null) {
+            this.filterManager = filterManager;
+            this.indexTuple = new PermutingTupleReference(indexFields);
+            this.filterTuple = new PermutingTupleReference(filterFields);
+            this.filterCmp = filterCmp;
+        } else {
+            this.filterManager = null;
+            this.indexTuple = null;
+            this.filterTuple = null;
+            this.filterCmp = null;
+        }
+    }
+
+    @Override
+    public void add(ITupleReference tuple) throws HyracksDataException {
+        try {
+            ITupleReference t;
+            if (indexTuple != null) {
+                indexTuple.reset(tuple);
+                t = indexTuple;
+            } else {
+                t = tuple;
+            }
+
+            indexBulkLoader.add(t);
+            if (bloomFilterBuilder != null) {
+                bloomFilterBuilder.add(t);
+            }
+
+            if (filterTuple != null) {
+                filterTuple.reset(tuple);
+                component.getLSMComponentFilter().update(filterTuple, filterCmp);
+            }
+        } catch (Exception e) {
+            cleanupArtifacts();
+            throw e;
+        }
+        if (isEmptyComponent) {
+            isEmptyComponent = false;
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        if (indexBulkLoader != null) {
+            indexBulkLoader.abort();
+        }
+        if (bloomFilterBuilder != null) {
+            bloomFilterBuilder.abort();
+        }
+
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+                bloomFilterBuilder.end();
+                endedBloomFilterLoad = true;
+            }
+
+            //use filter
+            if (filterManager != null && component.getLSMComponentFilter() != null) {
+                filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component));
+            }
+            indexBulkLoader.end();
+
+            if (isEmptyComponent) {
+                cleanupArtifacts();
+            }
+        }
+    }
+
+    protected void cleanupArtifacts() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            cleanedUpArtifacts = true;
+            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+                bloomFilterBuilder.abort();
+                endedBloomFilterLoad = true;
+            }
+            getIndex(component).deactivate();
+            getIndex(component).destroy();
+            if (bloomFilterBuilder != null) {
+                getBloomFilter(component).deactivate();
+                getBloomFilter(component).destroy();
+            }
+        }
+    }
+
+    /**
+     * TreeIndex is used to hold the filter tuple values
+     *
+     * @param component
+     * @return
+     */
+    protected ITreeIndex getTreeIndex(ILSMDiskComponent component) {
+        return (ITreeIndex) getIndex(component);
+    }
+
+    protected abstract IIndex getIndex(ILSMDiskComponent component);
+
+    protected abstract BloomFilter getBloomFilter(ILSMDiskComponent component);
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java
new file mode 100644
index 0000000..453d6cf
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public abstract class AbstractLSMDiskComponentWithBuddyBulkLoader extends AbstractLSMDiskComponentBulkLoader {
+
+    protected final IIndexBulkLoader buddyBTreeBulkLoader;
+
+    //with filter
+    public AbstractLSMDiskComponentWithBuddyBulkLoader(ILSMDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+            MultiComparator filterCmp) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                indexFields, filterFields, filterCmp);
+
+        // BuddyBTree must be created even if it could be empty,
+        // since without it the component is not considered as valid.
+        buddyBTreeBulkLoader =
+                getBuddyBTree(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+    }
+
+    @Override
+    public void add(ITupleReference tuple) throws HyracksDataException {
+        try {
+            ITupleReference t;
+            if (indexTuple != null) {
+                indexTuple.reset(tuple);
+                t = indexTuple;
+            } else {
+                t = tuple;
+            }
+
+            indexBulkLoader.add(t);
+
+            if (filterTuple != null) {
+                filterTuple.reset(tuple);
+                component.getLSMComponentFilter().update(filterTuple, filterCmp);
+            }
+        } catch (Exception e) {
+            cleanupArtifacts();
+            throw e;
+        }
+        if (isEmptyComponent) {
+            isEmptyComponent = false;
+        }
+    }
+
+    public void delete(ITupleReference tuple) throws HyracksDataException {
+        try {
+            buddyBTreeBulkLoader.add(tuple);
+            if (bloomFilterBuilder != null) {
+                bloomFilterBuilder.add(tuple);
+            }
+        } catch (HyracksDataException e) {
+            //deleting a key multiple times is OK
+            if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) {
+                cleanupArtifacts();
+                throw e;
+            }
+        } catch (Exception e) {
+            cleanupArtifacts();
+            throw e;
+        }
+        if (isEmptyComponent) {
+            isEmptyComponent = false;
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        super.abort();
+        if (buddyBTreeBulkLoader != null) {
+            buddyBTreeBulkLoader.abort();
+        }
+    }
+
+    @Override
+    public void end() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+                bloomFilterBuilder.end();
+                endedBloomFilterLoad = true;
+            }
+
+            //use filter
+            if (filterManager != null && component.getLSMComponentFilter() != null) {
+                filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component));
+            }
+            indexBulkLoader.end();
+            buddyBTreeBulkLoader.end();
+
+            if (isEmptyComponent) {
+                cleanupArtifacts();
+            }
+        }
+    }
+
+    @Override
+    protected void cleanupArtifacts() throws HyracksDataException {
+        if (!cleanedUpArtifacts) {
+            cleanedUpArtifacts = true;
+            if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+                bloomFilterBuilder.abort();
+                endedBloomFilterLoad = true;
+            }
+            getIndex(component).deactivate();
+            getIndex(component).destroy();
+
+            getBuddyBTree(component).deactivate();
+            getBuddyBTree(component).destroy();
+
+            if (bloomFilterBuilder != null) {
+                getBloomFilter(component).deactivate();
+                getBloomFilter(component).destroy();
+            }
+        }
+    }
+
+    protected abstract ITreeIndex getBuddyBTree(ILSMDiskComponent component);
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 2363c43..f827b21 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -339,39 +339,14 @@
         // Create an inverted index instance to be bulk loaded.
         LSMInvertedIndexDiskComponent component = createDiskInvIndexComponent(componentFactory, flushOp.getTarget(),
                 flushOp.getDeletedKeysBTreeTarget(), flushOp.getBloomFilterTarget(), true);
-        IInvertedIndex diskInvertedIndex = component.getInvIndex();
 
         // Create a scan cursor on the BTree underlying the in-memory inverted index.
         LSMInvertedIndexMemoryComponent flushingComponent =
                 (LSMInvertedIndexMemoryComponent) flushOp.getFlushingComponent();
-        InMemoryInvertedIndexAccessor memInvIndexAccessor = (InMemoryInvertedIndexAccessor) flushingComponent
-                .getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-        BTreeAccessor memBTreeAccessor = memInvIndexAccessor.getBTreeAccessor();
+
         RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
-        IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false);
-        memBTreeAccessor.search(scanCursor, nullPred);
 
-        // Bulk load the disk inverted index from the in-memory inverted index.
-        IIndexBulkLoader invIndexBulkLoader = diskInvertedIndex.createBulkLoader(1.0f, false, 0L, false);
-        try {
-            while (scanCursor.hasNext()) {
-                scanCursor.next();
-                invIndexBulkLoader.add(scanCursor.getTuple());
-            }
-        } finally {
-            scanCursor.close();
-        }
-        if (component.getLSMComponentFilter() != null) {
-            List<ITupleReference> filterTuples = new ArrayList<>();
-            filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
-            filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
-            getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples);
-            getFilterManager().writeFilter(component.getLSMComponentFilter(),
-                    ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
-        }
-        flushingComponent.getMetadata().copy(component.getMetadata());
-        invIndexBulkLoader.end();
-
+        // Search the deleted keys BTree to calculate the number of elements for BloomFilter
         IIndexAccessor deletedKeysBTreeAccessor = flushingComponent.getDeletedKeysBTree()
                 .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         IIndexCursor btreeCountingCursor = ((BTreeAccessor) deletedKeysBTreeAccessor).createCountingSearchCursor();
@@ -387,33 +362,50 @@
             btreeCountingCursor.close();
         }
 
-        int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numBTreeTuples);
-        BloomFilterSpecification bloomFilterSpec =
-                BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-
-        // Create an BTree instance for the deleted keys.
-        BTree diskDeletedKeysBTree = component.getDeletedKeysBTree();
+        IIndexBulkLoader componentBulkLoader =
+                createComponentBulkLoader(component, 1.0f, false, numBTreeTuples, false, false);
 
         // Create a scan cursor on the deleted keys BTree underlying the in-memory inverted index.
         IIndexCursor deletedKeysScanCursor = deletedKeysBTreeAccessor.createSearchCursor(false);
         deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred);
 
-        // Bulk load the deleted-keys BTree.
-        IIndexBulkLoader deletedKeysBTreeBulkLoader = diskDeletedKeysBTree.createBulkLoader(1.0f, false, 0L, false);
-        IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numBTreeTuples,
-                bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-
         try {
             while (deletedKeysScanCursor.hasNext()) {
                 deletedKeysScanCursor.next();
-                deletedKeysBTreeBulkLoader.add(deletedKeysScanCursor.getTuple());
-                builder.add(deletedKeysScanCursor.getTuple());
+                ((LSMInvertedIndexDiskComponentBulkLoader) componentBulkLoader)
+                        .delete(deletedKeysScanCursor.getTuple());
             }
         } finally {
             deletedKeysScanCursor.close();
-            builder.end();
         }
-        deletedKeysBTreeBulkLoader.end();
+
+        // Scan the in-memory inverted index
+        InMemoryInvertedIndexAccessor memInvIndexAccessor = (InMemoryInvertedIndexAccessor) flushingComponent
+                .getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        BTreeAccessor memBTreeAccessor = memInvIndexAccessor.getBTreeAccessor();
+        IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false);
+        memBTreeAccessor.search(scanCursor, nullPred);
+
+        // Bulk load the disk inverted index from the in-memory inverted index.
+        try {
+            while (scanCursor.hasNext()) {
+                scanCursor.next();
+                componentBulkLoader.add(scanCursor.getTuple());
+            }
+        } finally {
+            scanCursor.close();
+        }
+        if (component.getLSMComponentFilter() != null) {
+            List<ITupleReference> filterTuples = new ArrayList<>();
+            filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
+            filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
+            filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples);
+            filterManager.writeFilter(component.getLSMComponentFilter(),
+                    ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
+        }
+        flushingComponent.getMetadata().copy(component.getMetadata());
+
+        componentBulkLoader.end();
 
         return component;
     }
@@ -433,7 +425,7 @@
         LSMInvertedIndexDiskComponent component = createDiskInvIndexComponent(componentFactory, mergeOp.getTarget(),
                 mergeOp.getDeletedKeysBTreeTarget(), mergeOp.getBloomFilterTarget(), true);
 
-        IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
+        IIndexBulkLoader componentBulkLoader;
 
         // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the inverted indexes so that
         // lsmHarness.endSearch() is called once when the inverted indexes have been merged.
@@ -445,44 +437,31 @@
                     new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx);
             search(opCtx, btreeCursor, mergePred);
 
-            BTree btree = component.getDeletedKeysBTree();
-            IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
-
             long numElements = 0L;
             for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
                 numElements += ((LSMInvertedIndexDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter()
                         .getNumElements();
             }
 
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
-            BloomFilterSpecification bloomFilterSpec =
-                    BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-            IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numElements,
-                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+            componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, numElements, false, false);
             try {
                 while (btreeCursor.hasNext()) {
                     btreeCursor.next();
                     ITupleReference tuple = btreeCursor.getTuple();
-                    btreeBulkLoader.add(tuple);
-                    builder.add(tuple);
+                    ((LSMInvertedIndexDiskComponentBulkLoader) componentBulkLoader).delete(tuple);
                 }
             } finally {
                 btreeCursor.close();
-                builder.end();
             }
-            btreeBulkLoader.end();
         } else {
-            BTree btree = component.getDeletedKeysBTree();
-            IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
-            btreeBulkLoader.end();
+            componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, 0L, false, false);
         }
 
-        IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
                 ITupleReference tuple = cursor.getTuple();
-                invIndexBulkLoader.add(tuple);
+                componentBulkLoader.add(tuple);
             }
         } finally {
             cursor.close();
@@ -503,12 +482,33 @@
             getFilterManager().writeFilter(component.getLSMComponentFilter(),
                     ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
         }
-        invIndexBulkLoader.end();
+
+        componentBulkLoader.end();
 
         return component;
     }
 
     @Override
+    public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+            throws HyracksDataException {
+        BloomFilterSpecification bloomFilterSpec = null;
+        if (numElementsHint > 0) {
+            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+        }
+        if (withFilter && filterFields != null) {
+            return new LSMInvertedIndexDiskComponentBulkLoader((LSMInvertedIndexDiskComponent) component,
+                    bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                    treeFields, filterFields,
+                    MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+        } else {
+            return new LSMInvertedIndexDiskComponentBulkLoader((LSMInvertedIndexDiskComponent) component,
+                    bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+        }
+    }
+
+    @Override
     public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
             throws HyracksDataException {
         return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint);
@@ -516,105 +516,35 @@
 
     public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
         private final ILSMDiskComponent component;
-        private final IIndexBulkLoader invIndexBulkLoader;
-        private final IIndexBulkLoader deletedKeysBTreeBulkLoader;
-        private boolean cleanedUpArtifacts = false;
-        private boolean isEmptyComponent = true;
-        public final PermutingTupleReference indexTuple;
-        public final PermutingTupleReference filterTuple;
-        public final MultiComparator filterCmp;
+        private final IIndexBulkLoader componentBulkLoader;
 
         public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
                 throws HyracksDataException {
             // Note that by using a flush target file name, we state that the
             // new bulk loaded tree is "newer" than any other merged tree.
             component = createBulkLoadTarget();
-            invIndexBulkLoader = ((LSMInvertedIndexDiskComponent) component).getInvIndex().createBulkLoader(fillFactor,
-                    verifyInput, numElementsHint, false);
 
-            //validity of the component depends on the deleted keys file being there even if it's empty.
-            deletedKeysBTreeBulkLoader = ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree()
-                    .createBulkLoader(fillFactor, verifyInput, numElementsHint, false);
-
-            if (getFilterFields() != null) {
-                indexTuple = new PermutingTupleReference(getTreeFields());
-                filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
-                filterTuple = new PermutingTupleReference(getFilterFields());
-            } else {
-                indexTuple = null;
-                filterCmp = null;
-                filterTuple = null;
-            }
+            componentBulkLoader =
+                    createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
         }
 
         @Override
         public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                ITupleReference t;
-                if (indexTuple != null) {
-                    indexTuple.reset(tuple);
-                    t = indexTuple;
-                } else {
-                    t = tuple;
-                }
-
-                invIndexBulkLoader.add(t);
-
-                if (filterTuple != null) {
-                    filterTuple.reset(tuple);
-                    component.getLSMComponentFilter().update(filterTuple, filterCmp);
-                }
-
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
-        }
-
-        protected void cleanupArtifacts() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                cleanedUpArtifacts = true;
-                ((LSMInvertedIndexDiskComponent) component).getInvIndex().deactivate();
-                ((LSMInvertedIndexDiskComponent) component).getInvIndex().destroy();
-                ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree().deactivate();
-                ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree().destroy();
-                ((LSMInvertedIndexDiskComponent) component).getBloomFilter().deactivate();
-                ((LSMInvertedIndexDiskComponent) component).getBloomFilter().destroy();
-            }
+            componentBulkLoader.add(tuple);
         }
 
         @Override
         public void end() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                if (component.getLSMComponentFilter() != null) {
-                    getFilterManager().writeFilter(component.getLSMComponentFilter(),
-                            ((OnDiskInvertedIndex) ((LSMInvertedIndexDiskComponent) component).getInvIndex())
-                                    .getBTree());
-                }
-                invIndexBulkLoader.end();
-                deletedKeysBTreeBulkLoader.end();
-
-                if (isEmptyComponent) {
-                    cleanupArtifacts();
-                } else {
-                    ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
-                    getLsmHarness().addBulkLoadedComponent(component);
-                }
+            componentBulkLoader.end();
+            if (component.getComponentSize() > 0) {
+                ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
+                lsmHarness.addBulkLoadedComponent(component);
             }
         }
 
         @Override
         public void abort() throws HyracksDataException {
-            if (invIndexBulkLoader != null) {
-                invIndexBulkLoader.abort();
-            }
-
-            if (deletedKeysBTreeBulkLoader != null) {
-                deletedKeysBTreeBulkLoader.abort();
-            }
+            componentBulkLoader.abort();
         }
 
         private ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java
new file mode 100644
index 0000000..38e1c42
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBulkLoader;
+import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMInvertedIndexDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBulkLoader {
+
+    //with filter
+    public LSMInvertedIndexDiskComponentBulkLoader(LSMInvertedIndexDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+            MultiComparator filterCmp) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                indexFields, filterFields, filterCmp);
+    }
+
+    //without filter
+    public LSMInvertedIndexDiskComponentBulkLoader(LSMInvertedIndexDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+                null);
+    }
+
+    @Override
+    protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+        return ((LSMInvertedIndexDiskComponent) component).getBloomFilter();
+    }
+
+    @Override
+    protected IIndex getIndex(ILSMDiskComponent component) {
+        return ((LSMInvertedIndexDiskComponent) component).getInvIndex();
+    }
+
+    @Override
+    protected ITreeIndex getTreeIndex(ILSMDiskComponent component) {
+        return ((OnDiskInvertedIndex) ((LSMInvertedIndexDiskComponent) component).getInvIndex()).getBTree();
+    }
+
+    @Override
+    protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) {
+        return ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree();
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 13ff420..3618737 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -71,6 +71,7 @@
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 
 public class LSMRTree extends AbstractLSMRTree {
@@ -173,49 +174,11 @@
         RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor(false);
         SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
         memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
+
         LSMRTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getTarget(),
                 flushOp.getBTreeTarget(), flushOp.getBloomFilterTarget(), true);
-        RTree diskRTree = component.getRTree();
-        IIndexBulkLoader rTreeBulkloader;
-        ITreeIndexCursor cursor;
 
-        IBinaryComparatorFactory[] linearizerArray = { linearizer };
-
-        TreeTupleSorter rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getRTree().getFileId(),
-                linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
-                flushingComponent.getRTree().getBufferCache(), comparatorFields);
-        // BulkLoad the tuples from the in-memory tree into the new disk
-        // RTree.
-
-        boolean isEmpty = true;
-        try {
-            while (rtreeScanCursor.hasNext()) {
-                isEmpty = false;
-                rtreeScanCursor.next();
-                rTreeTupleSorter.insertTupleEntry(rtreeScanCursor.getPageId(), rtreeScanCursor.getTupleOffset());
-            }
-        } finally {
-            rtreeScanCursor.close();
-        }
-        rTreeTupleSorter.sort();
-
-        rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false);
-        cursor = rTreeTupleSorter;
-
-        if (!isEmpty) {
-            try {
-                while (cursor.hasNext()) {
-                    cursor.next();
-                    ITupleReference frameTuple = cursor.getTuple();
-                    rTreeBulkloader.add(frameTuple);
-                }
-            } finally {
-                cursor.close();
-            }
-        }
-
-        rTreeBulkloader.end();
-
+        //count the number of tuples in the buddy btree
         ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
                 .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
@@ -232,29 +195,55 @@
             btreeCountingCursor.close();
         }
 
-        int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numBTreeTuples);
-        BloomFilterSpecification bloomFilterSpec =
-                BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+        IIndexBulkLoader componentBulkLoader =
+                createComponentBulkLoader(component, 1.0f, false, numBTreeTuples, false, false);
 
+        ITreeIndexCursor cursor;
+        IBinaryComparatorFactory[] linearizerArray = { linearizer };
+
+        TreeTupleSorter rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getRTree().getFileId(),
+                linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
+                flushingComponent.getRTree().getBufferCache(), comparatorFields);
+
+        // BulkLoad the tuples from the in-memory tree into the new disk
+        // RTree.
+        boolean isEmpty = true;
+        try {
+            while (rtreeScanCursor.hasNext()) {
+                isEmpty = false;
+                rtreeScanCursor.next();
+                rTreeTupleSorter.insertTupleEntry(rtreeScanCursor.getPageId(), rtreeScanCursor.getTupleOffset());
+            }
+        } finally {
+            rtreeScanCursor.close();
+        }
+        rTreeTupleSorter.sort();
+
+        cursor = rTreeTupleSorter;
+
+        if (!isEmpty) {
+            try {
+                while (cursor.hasNext()) {
+                    cursor.next();
+                    ITupleReference frameTuple = cursor.getTuple();
+                    componentBulkLoader.add(frameTuple);
+                }
+            } finally {
+                cursor.close();
+            }
+        }
+
+        // scan the memory BTree
         IIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor(false);
         memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
-        BTree diskBTree = component.getBTree();
-
-        // BulkLoad the tuples from the in-memory tree into the new disk BTree.
-        IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false, numBTreeTuples, false);
-        IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numBTreeTuples,
-                bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-        // scan the memory BTree
         try {
             while (btreeScanCursor.hasNext()) {
                 btreeScanCursor.next();
                 ITupleReference frameTuple = btreeScanCursor.getTuple();
-                bTreeBulkloader.add(frameTuple);
-                builder.add(frameTuple);
+                ((LSMRTreeDiskComponentBulkLoader) componentBulkLoader).delete(frameTuple);
             }
         } finally {
             btreeScanCursor.close();
-            builder.end();
         }
 
         if (component.getLSMComponentFilter() != null) {
@@ -266,7 +255,8 @@
         }
         // Note. If we change the filter to write to metadata object, we don't need the if block above
         flushingComponent.getMetadata().copy(component.getMetadata());
-        bTreeBulkloader.end();
+
+        componentBulkLoader.end();
         return component;
     }
 
@@ -282,40 +272,46 @@
         LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(),
                 mergeOp.getBTreeTarget(), mergeOp.getBloomFilterTarget(), true);
 
+        IIndexBulkLoader componentBulkLoader;
+
         // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that
         // lsmHarness.endSearch() is called once when the r-trees have been merged.
-        BTree btree = mergedComponent.getBTree();
-        IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
         if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
                 .get(diskComponents.size() - 1)) {
             // Keep the deleted tuples since the oldest disk component is not included in the merge operation
 
-            LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
-            search(opCtx, btreeCursor, rtreeSearchPred);
-
             long numElements = 0L;
             for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
                 numElements += ((LSMRTreeDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter()
                         .getNumElements();
             }
+            componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, numElements, false, false);
 
-            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
-            BloomFilterSpecification bloomFilterSpec =
-                    BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-            IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
-                    bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-
+            LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
+            search(opCtx, btreeCursor, rtreeSearchPred);
             try {
                 while (btreeCursor.hasNext()) {
                     btreeCursor.next();
                     ITupleReference tuple = btreeCursor.getTuple();
-                    btreeBulkLoader.add(tuple);
-                    builder.add(tuple);
+                    ((LSMRTreeDiskComponentBulkLoader) componentBulkLoader).delete(tuple);
                 }
             } finally {
                 btreeCursor.close();
-                builder.end();
             }
+        } else {
+            //no buddy-btree needed
+            componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, 0L, false, false);
+        }
+
+        //search old rtree components
+        try {
+            while (cursor.hasNext()) {
+                cursor.next();
+                ITupleReference frameTuple = cursor.getTuple();
+                componentBulkLoader.add(frameTuple);
+            }
+        } finally {
+            cursor.close();
         }
 
         if (mergedComponent.getLSMComponentFilter() != null) {
@@ -327,19 +323,8 @@
             getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
             getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getRTree());
         }
-        btreeBulkLoader.end();
 
-        IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
-        try {
-            while (cursor.hasNext()) {
-                cursor.next();
-                ITupleReference frameTuple = cursor.getTuple();
-                bulkLoader.add(frameTuple);
-            }
-        } finally {
-            cursor.close();
-        }
-        bulkLoader.end();
+        componentBulkLoader.end();
 
         return mergedComponent;
     }
@@ -358,6 +343,25 @@
     }
 
     @Override
+    public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+            throws HyracksDataException {
+        BloomFilterSpecification bloomFilterSpec = null;
+        if (numElementsHint > 0) {
+            int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+            bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+        }
+        if (withFilter && filterFields != null) {
+            return new LSMRTreeDiskComponentBulkLoader((LSMRTreeDiskComponent) component, bloomFilterSpec, fillFactor,
+                    verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields, filterFields,
+                    MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+        } else {
+            return new LSMRTreeDiskComponentBulkLoader((LSMRTreeDiskComponent) component, bloomFilterSpec, fillFactor,
+                    verifyInput, numElementsHint, checkIfEmptyIndex);
+        }
+    }
+
+    @Override
     public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
             throws HyracksDataException {
         return new LSMRTreeBulkLoader(this, fillLevel, verifyInput, numElementsHint);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java
index edc3e7d..fbdd37a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java
@@ -20,110 +20,41 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.MultiComparator;
 
 public class LSMRTreeBulkLoader implements IIndexBulkLoader {
     private final ILSMDiskComponent component;
-    private final IIndexBulkLoader bulkLoader;
-    private final IIndexBulkLoader buddyBTreeBulkloader;
-    private boolean cleanedUpArtifacts = false;
-    private boolean isEmptyComponent = true;
-    public final PermutingTupleReference indexTuple;
-    public final PermutingTupleReference filterTuple;
-    public final MultiComparator filterCmp;
     private final LSMRTree lsmIndex;
+    private final IIndexBulkLoader componentBulkLoader;
 
     public LSMRTreeBulkLoader(LSMRTree lsmIndex, float fillFactor, boolean verifyInput, long numElementsHint)
             throws HyracksDataException {
         this.lsmIndex = lsmIndex;
         // Note that by using a flush target file name, we state that the
         // new bulk loaded tree is "newer" than any other merged tree.
-        component = lsmIndex.createBulkLoadTarget();
-        bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
-                numElementsHint, false);
-        buddyBTreeBulkloader = ((LSMRTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor, verifyInput,
-                numElementsHint, false);
-        if (lsmIndex.getFilterFields() != null) {
-            indexTuple = new PermutingTupleReference(lsmIndex.getTreeFields());
-            filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
-            filterTuple = new PermutingTupleReference(lsmIndex.getFilterFields());
-        } else {
-            indexTuple = null;
-            filterCmp = null;
-            filterTuple = null;
-        }
+        this.component = lsmIndex.createBulkLoadTarget();
+        this.componentBulkLoader =
+                lsmIndex.createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
     }
 
     @Override
     public void add(ITupleReference tuple) throws HyracksDataException {
-        try {
-            ITupleReference t;
-            if (indexTuple != null) {
-                indexTuple.reset(tuple);
-                t = indexTuple;
-            } else {
-                t = tuple;
-            }
-
-            bulkLoader.add(t);
-
-            if (filterTuple != null) {
-                filterTuple.reset(tuple);
-                component.getLSMComponentFilter().update(filterTuple, filterCmp);
-            }
-        } catch (Exception e) {
-            cleanupArtifacts();
-            throw e;
-        }
-        if (isEmptyComponent) {
-            isEmptyComponent = false;
-        }
+        componentBulkLoader.add(tuple);
     }
 
     @Override
     public void end() throws HyracksDataException {
-        if (!cleanedUpArtifacts) {
-
-            if (component.getLSMComponentFilter() != null) {
-                lsmIndex.getFilterManager().writeFilter(component.getLSMComponentFilter(),
-                        ((LSMRTreeDiskComponent) component).getRTree());
-            }
-
-            bulkLoader.end();
-            buddyBTreeBulkloader.end();
-
-            if (isEmptyComponent) {
-                cleanupArtifacts();
-            } else {
-                lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component);
-                lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
-            }
+        componentBulkLoader.end();
+        if (component.getComponentSize() > 0) {
+            lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component);
+            lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
         }
     }
 
     @Override
     public void abort() throws HyracksDataException {
-        if (bulkLoader != null) {
-            bulkLoader.abort();
-        }
-        if (buddyBTreeBulkloader != null) {
-            buddyBTreeBulkloader.abort();
-        }
-    }
-
-    protected void cleanupArtifacts() throws HyracksDataException {
-        if (!cleanedUpArtifacts) {
-            cleanedUpArtifacts = true;
-            ((LSMRTreeDiskComponent) component).getRTree().deactivate();
-            ((LSMRTreeDiskComponent) component).getRTree().destroy();
-            ((LSMRTreeDiskComponent) component).getBTree().deactivate();
-            ((LSMRTreeDiskComponent) component).getBTree().destroy();
-            ((LSMRTreeDiskComponent) component).getBloomFilter().deactivate();
-            ((LSMRTreeDiskComponent) component).getBloomFilter().destroy();
-        }
+        componentBulkLoader.abort();
     }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
new file mode 100644
index 0000000..ff0a299
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMRTreeDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBulkLoader {
+
+    //with filter
+    public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+            ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp)
+            throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                indexFields, filterFields, filterCmp);
+    }
+
+    //without filter
+    public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+            float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex)
+            throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+                null);
+    }
+
+    @Override
+    protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getBloomFilter();
+    }
+
+    @Override
+    protected IIndex getIndex(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getRTree();
+    }
+
+    @Override
+    protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getBTree();
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index a20e6f2..24c46d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -35,7 +35,6 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
@@ -136,14 +135,7 @@
         SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
         memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
         LSMRTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getTarget(), null, null, true);
-        RTree diskRTree = component.getRTree();
-
-        // scan the memory BTree
-        ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
-                .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-        BTreeRangeSearchCursor btreeScanCursor = (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false);
-        RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
-        memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
+        IIndexBulkLoader componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, 0L, false, false);
 
         // Since the LSM-RTree is used as a secondary assumption, the
         // primary key will be the last comparator in the BTree comparators
@@ -151,12 +143,6 @@
                 linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
                 flushingComponent.getRTree().getBufferCache(), comparatorFields);
 
-        TreeTupleSorter bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBTree().getFileId(),
-                linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(),
-                flushingComponent.getBTree().getBufferCache(), comparatorFields);
-        // BulkLoad the tuples from the in-memory tree into the new disk
-        // RTree.
-
         boolean isEmpty = true;
         try {
             while (rtreeScanCursor.hasNext()) {
@@ -171,6 +157,16 @@
             rTreeTupleSorter.sort();
         }
 
+        // scan the memory BTree
+        ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
+                .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        BTreeRangeSearchCursor btreeScanCursor = (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false);
+        RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
+        memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
+        TreeTupleSorter bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBTree().getFileId(),
+                linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(),
+                flushingComponent.getBTree().getBufferCache(), comparatorFields);
+
         isEmpty = true;
         try {
             while (btreeScanCursor.hasNext()) {
@@ -185,7 +181,6 @@
             bTreeTupleSorter.sort();
         }
 
-        IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false);
         LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new LSMRTreeWithAntiMatterTuplesFlushCursor(rTreeTupleSorter,
                 bTreeTupleSorter, comparatorFields, linearizerArray);
         cursor.open(null, null);
@@ -195,7 +190,7 @@
                 cursor.next();
                 ITupleReference frameTuple = cursor.getTuple();
 
-                rTreeBulkloader.add(frameTuple);
+                componentBulkLoader.add(frameTuple);
             }
         } finally {
             cursor.close();
@@ -209,8 +204,8 @@
             getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getRTree());
         }
         flushingComponent.getMetadata().copy(component.getMetadata());
-        rTreeBulkloader.end();
 
+        componentBulkLoader.end();
         return component;
     }
 
@@ -225,13 +220,13 @@
 
         // Bulk load the tuples from all on-disk RTrees into the new RTree.
         LSMRTreeDiskComponent component = createDiskComponent(componentFactory, mergeOp.getTarget(), null, null, true);
-        RTree mergedRTree = component.getRTree();
-        IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L, false);
+
+        IIndexBulkLoader componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, 0L, false, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
                 ITupleReference frameTuple = cursor.getTuple();
-                bulkloader.add(frameTuple);
+                componentBulkLoader.add(frameTuple);
             }
         } finally {
             cursor.close();
@@ -245,7 +240,8 @@
             getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples);
             getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getRTree());
         }
-        bulkloader.end();
+
+        componentBulkLoader.end();
 
         return component;
     }
@@ -258,6 +254,20 @@
     }
 
     @Override
+    public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+            throws HyracksDataException {
+        if (withFilter && filterFields != null) {
+            return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader((LSMRTreeDiskComponent) component, null,
+                    fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields,
+                    filterFields, MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+        } else {
+            return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader((LSMRTreeDiskComponent) component, null,
+                    fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+        }
+    }
+
+    @Override
     public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
             throws HyracksDataException {
         return new LSMRTreeWithAntiMatterTuplesBulkLoader(fillLevel, verifyInput, numElementsHint);
@@ -265,91 +275,35 @@
 
     public class LSMRTreeWithAntiMatterTuplesBulkLoader implements IIndexBulkLoader {
         private final ILSMDiskComponent component;
-        private final IIndexBulkLoader bulkLoader;
-        private boolean cleanedUpArtifacts = false;
-        private boolean isEmptyComponent = true;
-        public final PermutingTupleReference indexTuple;
-        public final PermutingTupleReference filterTuple;
-        public final MultiComparator filterCmp;
+        private final IIndexBulkLoader componentBulkLoader;
 
         public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
                 throws HyracksDataException {
-            // Note that by using a flush target file name, we state that the
-            // new bulk loaded tree is "newer" than any other merged tree.
-
             component = createBulkLoadTarget();
-            bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
-                    numElementsHint, false);
 
-            if (getFilterFields() != null) {
-                indexTuple = new PermutingTupleReference(getTreeFields());
-                filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
-                filterTuple = new PermutingTupleReference(getFilterFields());
-            } else {
-                indexTuple = null;
-                filterCmp = null;
-                filterTuple = null;
-            }
+            componentBulkLoader =
+                    createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
         }
 
         @Override
         public void add(ITupleReference tuple) throws HyracksDataException {
-            try {
-                ITupleReference t;
-                if (indexTuple != null) {
-                    indexTuple.reset(tuple);
-                    t = indexTuple;
-                } else {
-                    t = tuple;
-                }
-
-                bulkLoader.add(t);
-
-                if (filterTuple != null) {
-                    filterTuple.reset(tuple);
-                    component.getLSMComponentFilter().update(filterTuple, filterCmp);
-                }
-
-            } catch (Exception e) {
-                cleanupArtifacts();
-                throw e;
-            }
-            if (isEmptyComponent) {
-                isEmptyComponent = false;
-            }
+            componentBulkLoader.add(tuple);
         }
 
         @Override
         public void end() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
 
-                if (component.getLSMComponentFilter() != null) {
-                    getFilterManager().writeFilter(component.getLSMComponentFilter(),
-                            ((LSMRTreeDiskComponent) component).getRTree());
-                }
-                bulkLoader.end();
-
-                if (isEmptyComponent) {
-                    cleanupArtifacts();
-                } else {
-                    ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
-                    getLsmHarness().addBulkLoadedComponent(component);
-                }
+            componentBulkLoader.end();
+            if (component.getComponentSize() > 0) {
+                ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
+                lsmHarness.addBulkLoadedComponent(component);
             }
         }
 
         @Override
         public void abort() throws HyracksDataException {
-            if (bulkLoader != null) {
-                bulkLoader.abort();
-            }
-        }
-
-        protected void cleanupArtifacts() throws HyracksDataException {
-            if (!cleanedUpArtifacts) {
-                cleanedUpArtifacts = true;
-                ((LSMRTreeDiskComponent) component).getRTree().deactivate();
-                ((LSMRTreeDiskComponent) component).getRTree().destroy();
+            if (componentBulkLoader != null) {
+                componentBulkLoader.abort();
             }
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
new file mode 100644
index 0000000..88a2e1e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader extends AbstractLSMDiskComponentBulkLoader {
+
+    //with filter
+    public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+            MultiComparator filterCmp) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+                indexFields, filterFields, filterCmp);
+    }
+
+    //without filter
+    public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component,
+            BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex) throws HyracksDataException {
+        super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+                null);
+    }
+
+    @Override
+    protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getBloomFilter();
+    }
+
+    @Override
+    protected ITreeIndex getIndex(ILSMDiskComponent component) {
+        return ((LSMRTreeDiskComponent) component).getRTree();
+    }
+
+}