Integrated bloom filters with LSM-BTree during flushes, merges, and bulkload. All tests pass except the merge test due to what it seems a bug in the cleanup after merges if there are no search threads accessing the disk components. Next is to use bloom filters during search and also with other lsm indexes.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree_bloom_filter@2706 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/pom.xml b/hyracks-storage-am-lsm-btree/pom.xml
index 17f3714..afef819 100644
--- a/hyracks-storage-am-lsm-btree/pom.xml
+++ b/hyracks-storage-am-lsm-btree/pom.xml
@@ -33,6 +33,13 @@
   	</dependency> 
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-storage-am-bloomfilter</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency> 
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-am-lsm-common</artifactId>
   		<version>0.2.2-SNAPSHOT</version>
   		<type>jar</type>
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 4cd4974..6d76a86 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -23,8 +23,12 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -64,7 +68,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
@@ -86,13 +89,16 @@
     private final ITreeIndexFrameFactory deleteLeafFrameFactory;
     private final IBinaryComparatorFactory[] cmpFactories;
 
+    private final int numHashes = 10;
+
     public LSMBTree(IInMemoryBufferCache memBufferCache, IInMemoryFreePageManager memFreePageManager,
             ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
             ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMIndexFileManager fileManager,
             TreeIndexFactory<BTree> diskBTreeFactory, TreeIndexFactory<BTree> bulkLoadBTreeFactory,
-            IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
-            ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
-            ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
+            BloomFilterFactory bloomFilterFactory, IFileMapProvider diskFileMapProvider, int fieldCount,
+            IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy,
+            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
+            ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
         super(memFreePageManager, diskBTreeFactory.getBufferCache(), fileManager, diskFileMapProvider, mergePolicy,
                 opTrackerFactory, ioScheduler, ioOpCallbackProvider);
         mutableComponent = new LSMBTreeMutableComponent(new BTree(memBufferCache,
@@ -102,8 +108,8 @@
         this.insertLeafFrameFactory = insertLeafFrameFactory;
         this.deleteLeafFrameFactory = deleteLeafFrameFactory;
         this.cmpFactories = cmpFactories;
-        componentFactory = new LSMBTreeImmutableComponentFactory(diskBTreeFactory);
-        bulkLoadComponentFactory = new LSMBTreeImmutableComponentFactory(bulkLoadBTreeFactory);
+        componentFactory = new LSMBTreeImmutableComponentFactory(diskBTreeFactory, bloomFilterFactory);
+        bulkLoadComponentFactory = new LSMBTreeImmutableComponentFactory(bulkLoadBTreeFactory, bloomFilterFactory);
     }
 
     @Override
@@ -135,14 +141,15 @@
             throw new HyracksDataException(e);
         }
         for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
-            LSMBTreeImmutableComponent btree;
+            LSMBTreeImmutableComponent component;
             try {
-                btree = createDiskComponent(componentFactory, lsmComonentFileReference.getInsertIndexFileReference(),
-                        false);
+                component = createDiskComponent(componentFactory,
+                        lsmComonentFileReference.getInsertIndexFileReference(),
+                        lsmComonentFileReference.getBloomFilterFileReference(), 0L, 0, false);
             } catch (IndexException e) {
                 throw new HyracksDataException(e);
             }
-            immutableComponents.add(btree);
+            immutableComponents.add(component);
         }
         isActivated = true;
     }
@@ -167,8 +174,11 @@
 
         List<ILSMComponent> immutableComponents = componentsRef.get();
         for (ILSMComponent c : immutableComponents) {
-            BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
+            LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+            BTree btree = component.getBTree();
+            BloomFilter bloomFilter = component.getBloomFilter();
             btree.deactivate();
+            bloomFilter.deactivate();
         }
         mutableComponent.getBTree().deactivate();
         mutableComponent.getBTree().destroy();
@@ -189,8 +199,9 @@
 
         List<ILSMComponent> immutableComponents = componentsRef.get();
         for (ILSMComponent c : immutableComponents) {
-            BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
-            btree.destroy();
+            LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+            component.getBTree().destroy();
+            component.getBloomFilter().destroy();
         }
         mutableComponent.getBTree().destroy();
         fileManager.deleteDirs();
@@ -205,9 +216,11 @@
         List<ILSMComponent> immutableComponents = componentsRef.get();
         mutableComponent.getBTree().clear();
         for (ILSMComponent c : immutableComponents) {
-            BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
-            btree.deactivate();
-            btree.destroy();
+            LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+            component.getBloomFilter().deactivate();
+            component.getBTree().deactivate();
+            component.getBloomFilter().destroy();
+            component.getBTree().destroy();
         }
         immutableComponents.clear();
     }
@@ -346,24 +359,44 @@
         opCtx.setOperation(IndexOperation.FLUSH);
         opCtx.getComponentHolder().add(flushingComponent);
         ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
-        ioScheduler.scheduleOperation(new LSMFlushOperation(flushAccessor, flushingComponent, componentFileRefs
-                .getInsertIndexFileReference(), callback));
+        ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent, componentFileRefs
+                .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback));
     }
 
     @Override
     public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
-        LSMFlushOperation flushOp = (LSMFlushOperation) operation;
+        LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation;
         LSMBTreeMutableComponent flushingComponent = (LSMBTreeMutableComponent) flushOp.getFlushingComponent();
         IIndexAccessor accessor = flushingComponent.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
-        IIndexCursor scanCursor = accessor.createSearchCursor();
+
         RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
-        accessor.search(scanCursor, nullPred);
-        LSMBTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getFlushTarget(), true);
+        IIndexCursor countingCursor = ((BTreeAccessor) accessor).createCountingSearchCursor();
+        accessor.search(countingCursor, nullPred);
+        long numElements = 0L;
+        try {
+            while (countingCursor.hasNext()) {
+                countingCursor.next();
+                ITupleReference countTuple = countingCursor.getTuple();
+                numElements = IntegerSerializerDeserializer.getInt(countTuple.getFieldData(0),
+                        countTuple.getFieldStart(0));
+            }
+        } finally {
+            countingCursor.close();
+        }
+
+        LSMBTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getBTreeFlushTarget(),
+                flushOp.getBloomFilterFlushTarget(), numElements, numHashes, true);
         IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false);
+        BloomFilter bf = component.getBloomFilter();
+        long[] hashes = new long[2];
+
+        IIndexCursor scanCursor = accessor.createSearchCursor();
+        accessor.search(scanCursor, nullPred);
         try {
             while (scanCursor.hasNext()) {
                 scanCursor.next();
+                bf.add(scanCursor.getTuple(), hashes);
                 bulkLoader.add(scanCursor.getTuple());
             }
         } finally {
@@ -392,7 +425,7 @@
                 .getName(), lastFile.getFile().getName());
         ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
-                .getInsertIndexFileReference(), callback));
+                .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
     }
 
     @Override
@@ -401,12 +434,24 @@
         LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
         ITreeIndexCursor cursor = mergeOp.getCursor();
         mergedComponents.addAll(mergeOp.getMergingComponents());
-        LSMBTreeImmutableComponent mergedBTree = createDiskComponent(componentFactory, mergeOp.getMergeTarget(), true);
+
+        long numElements = 0L;
+        for (int i = 0; i < mergedComponents.size(); ++i) {
+            numElements += ((LSMBTreeImmutableComponent) mergedComponents.get(i)).getBloomFilter().getNumElements();
+        }
+
+        LSMBTreeImmutableComponent mergedBTree = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
+                mergeOp.getBloomFilterMergeTarget(), numElements, numHashes, true);
+
+        BloomFilter bf = mergedBTree.getBloomFilter();
+        long[] hashes = new long[2];
+
         IIndexBulkLoader bulkLoader = mergedBTree.getBTree().createBulkLoader(1.0f, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
                 ITupleReference frameTuple = cursor.getTuple();
+                bf.add(frameTuple, hashes);
                 bulkLoader.add(frameTuple);
             }
         } finally {
@@ -417,15 +462,18 @@
     }
 
     private LSMBTreeImmutableComponent createDiskComponent(LSMBTreeImmutableComponentFactory factory,
-            FileReference fileRef, boolean createComponent) throws HyracksDataException, IndexException {
+            FileReference btreeFileRef, FileReference bloomFilterFileRef, long numElements, int numHashes,
+            boolean createComponent) throws HyracksDataException, IndexException {
         // Create new BTree instance.
         LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) factory
-                .createLSMComponentInstance(new LSMComponentFileReferences(fileRef, null));
+                .createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
         if (createComponent) {
             component.getBTree().create();
+            component.getBloomFilter().create(numElements, numHashes);
         }
         // BTree will be closed during cleanup of merge().
         component.getBTree().activate();
+        component.getBloomFilter().activate();
         return component;
     }
 
@@ -434,25 +482,33 @@
         return new LSMBTreeBulkLoader(fillLevel, verifyInput);
     }
 
-    private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
-        LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
-        return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(), true);
-    }
-
     @Override
     public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
-        BTree btree = ((LSMBTreeImmutableComponent) lsmComponent).getBTree();
-        forceFlushDirtyPages(btree);
-        markAsValidInternal(btree);
+        // The order of forcing the dirty page to be flushed is critical. The bloom filter must be always done first.
+        LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) lsmComponent;
+        int fileId = component.getBloomFilter().getFileId();
+        IBufferCache bufferCache = component.getBTree().getBufferCache();
+        int startPage = 0;
+        int maxPage = component.getBloomFilter().getNumPages();
+        forceFlushDirtyPages(bufferCache, fileId, startPage, maxPage);
+        forceFlushDirtyPages(component.getBTree());
+        markAsValidInternal(component.getBTree());
     }
 
     public class LSMBTreeBulkLoader implements IIndexBulkLoader {
         private final ILSMComponent component;
         private final BTreeBulkLoader bulkLoader;
+        private long numElements;
 
         public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
             try {
-                component = createBulkLoadTarget();
+                numElements = 0L;
+                LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
+                component = (LSMBTreeImmutableComponent) bulkLoadComponentFactory
+                        .createLSMComponentInstance(new LSMComponentFileReferences(componentFileRefs
+                                .getInsertIndexFileReference(), null, componentFileRefs.getBloomFilterFileReference()));
+                ((LSMBTreeImmutableComponent) component).getBTree().create();
+                ((LSMBTreeImmutableComponent) component).getBTree().activate();
             } catch (HyracksDataException e) {
                 throw new TreeIndexException(e);
             } catch (IndexException e) {
@@ -466,6 +522,7 @@
         public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
             try {
                 bulkLoader.add(tuple);
+                ++numElements;
             } catch (IndexException e) {
                 handleException();
                 throw e;
@@ -486,6 +543,25 @@
         @Override
         public void end() throws HyracksDataException, IndexException {
             bulkLoader.end();
+            IIndexAccessor accessor = ((LSMBTreeImmutableComponent) component).getBTree().createAccessor(
+                    NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            IIndexCursor scanCursor = accessor.createSearchCursor();
+            RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
+            accessor.search(scanCursor, nullPred);
+            long[] hashes = new long[2];
+            int fileid1 = ((LSMBTreeImmutableComponent) component).getBTree().getFileId();
+            int fileid2 = ((LSMBTreeImmutableComponent) component).getBloomFilter().getFileId();
+            BloomFilter bf = ((LSMBTreeImmutableComponent) component).getBloomFilter();
+            bf.create(numElements, numHashes);
+            bf.activate();
+            try {
+                while (scanCursor.hasNext()) {
+                    scanCursor.next();
+                    bf.add(scanCursor.getTuple(), hashes);
+                }
+            } finally {
+                scanCursor.close();
+            }
             lsmHarness.addBulkLoadedComponent(component);
         }
 
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
new file mode 100644
index 0000000..61d956d
--- /dev/null
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class LSMBTreeFileManager extends AbstractLSMIndexFileManager {
+    private static final String BTREE_STRING = "b";
+    private static final String BLOOM_FILTER_STRING = "f";
+
+    private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
+
+    public LSMBTreeFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
+            TreeIndexFactory<? extends ITreeIndex> btreeFactory, int startIODeviceIndex) {
+        super(ioManager, fileMapProvider, file, null, startIODeviceIndex);
+        this.btreeFactory = btreeFactory;
+    }
+
+    protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
+            TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
+            throws HyracksDataException, IndexException {
+        File dir = new File(dev.getPath(), baseDir);
+        String[] files = dir.list(filter);
+        for (String fileName : files) {
+            File file = new File(dir.getPath() + File.separator + fileName);
+            FileReference fileRef = new FileReference(file);
+            if (treeFactory == null || isValidTreeIndex(treeFactory.createIndexInstance(fileRef))) {
+                allFiles.add(new ComparableFileName(fileRef));
+            } else {
+                file.delete();
+            }
+        }
+    }
+
+    @Override
+    public LSMComponentFileReferences getRelFlushFileReference() {
+        Date date = new Date();
+        String ts = formatter.format(date);
+        String baseName = baseDir + ts + SPLIT_STRING + ts;
+        // Begin timestamp and end timestamp are identical since it is a flush
+        return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + BTREE_STRING), null,
+                createFlushFile(baseName + SPLIT_STRING + BLOOM_FILTER_STRING));
+    }
+
+    @Override
+    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
+            throws HyracksDataException {
+        String[] firstTimestampRange = firstFileName.split(SPLIT_STRING);
+        String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
+
+        String baseName = baseDir + firstTimestampRange[0] + SPLIT_STRING + lastTimestampRange[1];
+        // Get the range of timestamps by taking the earliest and the latest timestamps
+        return new LSMComponentFileReferences(createMergeFile(baseName + SPLIT_STRING + BTREE_STRING), null,
+                createMergeFile(baseName + SPLIT_STRING + BLOOM_FILTER_STRING));
+    }
+
+    private static FilenameFilter btreeFilter = new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+            return !name.startsWith(".") && name.endsWith(BTREE_STRING);
+        }
+    };
+
+    private static FilenameFilter bloomFilterFilter = new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+            return !name.startsWith(".") && name.endsWith(BLOOM_FILTER_STRING);
+        }
+    };
+
+    @Override
+    public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException, IndexException {
+        List<LSMComponentFileReferences> validFiles = new ArrayList<LSMComponentFileReferences>();
+        ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<ComparableFileName>();
+        ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<ComparableFileName>();
+
+        // Gather files from all IODeviceHandles.
+        for (IODeviceHandle dev : ioManager.getIODevices()) {
+            cleanupAndGetValidFilesInternal(dev, bloomFilterFilter, null, allBloomFilterFiles);
+            HashSet<String> bloomFilterFilesSet = new HashSet<String>();
+            for (ComparableFileName cmpFileName : allBloomFilterFiles) {
+                int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+                bloomFilterFilesSet.add(cmpFileName.fileName.substring(0, index));
+            }
+            // List of valid BTree files that may or may not have a bloom filter buddy. Will check for buddies below.
+            ArrayList<ComparableFileName> tmpAllBTreeFiles = new ArrayList<ComparableFileName>();
+            cleanupAndGetValidFilesInternal(dev, btreeFilter, btreeFactory, tmpAllBTreeFiles);
+            // Look for buddy bloom filters for all valid BTrees. 
+            // If no buddy is found, delete the file, otherwise add the BTree to allBTreeFiles. 
+            for (ComparableFileName cmpFileName : tmpAllBTreeFiles) {
+                int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+                String file = cmpFileName.fileName.substring(0, index);
+                if (bloomFilterFilesSet.contains(file)) {
+                    allBTreeFiles.add(cmpFileName);
+                } else {
+                    // Couldn't find the corresponding bloom filter file; thus, delete
+                    // the BTree file.
+                    File invalidBTreeFile = new File(cmpFileName.fullPath);
+                    invalidBTreeFile.delete();
+                }
+            }
+        }
+        // Sanity check.
+        if (allBTreeFiles.size() != allBloomFilterFiles.size()) {
+            throw new HyracksDataException(
+                    "Unequal number of valid BTree and bloom filter files found. Aborting cleanup.");
+        }
+
+        // Trivial cases.
+        if (allBTreeFiles.isEmpty() || allBloomFilterFiles.isEmpty()) {
+            return validFiles;
+        }
+
+        if (allBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) {
+            validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef,
+                    allBloomFilterFiles.get(0).fileRef, null));
+            return validFiles;
+        }
+
+        // Sorts files names from earliest to latest timestamp.
+        Collections.sort(allBTreeFiles);
+        Collections.sort(allBloomFilterFiles);
+
+        List<ComparableFileName> validComparableBTreeFiles = new ArrayList<ComparableFileName>();
+        ComparableFileName lastBTree = allBTreeFiles.get(0);
+        validComparableBTreeFiles.add(lastBTree);
+
+        List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<ComparableFileName>();
+        ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+        validComparableBloomFilterFiles.add(lastBloomFilter);
+
+        for (int i = 1; i < allBTreeFiles.size(); i++) {
+            ComparableFileName currentBTree = allBTreeFiles.get(i);
+            ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
+            // Current start timestamp is greater than last stop timestamp.
+            if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
+                    && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+                validComparableBTreeFiles.add(currentBTree);
+                validComparableBloomFilterFiles.add(currentBloomFilter);
+                lastBTree = currentBTree;
+                lastBloomFilter = currentBloomFilter;
+            } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
+                    && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
+                    && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
+                    && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
+                // Invalid files are completely contained in last interval.
+                File invalidBTreeFile = new File(currentBTree.fullPath);
+                invalidBTreeFile.delete();
+                File invalidBloomFilterFile = new File(currentBloomFilter.fullPath);
+                invalidBloomFilterFile.delete();
+            } else {
+                // This scenario should not be possible.
+                throw new HyracksDataException("Found LSM files with overlapping but not contained timetamp intervals.");
+            }
+        }
+
+        // Sort valid files in reverse lexicographical order, such that newer
+        // files come first.
+        Collections.sort(validComparableBTreeFiles, recencyCmp);
+        Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+
+        Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
+        Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+        while (btreeFileIter.hasNext() && bloomFilterFileIter.hasNext()) {
+            ComparableFileName cmpBTreeFileName = btreeFileIter.next();
+            ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
+            validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, null,
+                    cmpBloomFilterFileName.fileRef));
+        }
+
+        return validFiles;
+    }
+}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
new file mode 100644
index 0000000..dfda07b
--- /dev/null
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -0,0 +1,71 @@
+package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
+
+public class LSMBTreeFlushOperation implements ILSMIOOperation {
+
+    private final ILSMIndexAccessorInternal accessor;
+    private final ILSMComponent flushingComponent;
+    private final FileReference btreeFlushTarget;
+    private final FileReference bloomFilterFlushTarget;
+    private final ILSMIOOperationCallback callback;
+
+    public LSMBTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
+            FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+        this.accessor = accessor;
+        this.flushingComponent = flushingComponent;
+        this.btreeFlushTarget = btreeFlushTarget;
+        this.bloomFilterFlushTarget = bloomFilterFlushTarget;
+        this.callback = callback;
+    }
+
+    @Override
+    public Set<IODeviceHandle> getReadDevices() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<IODeviceHandle> getWriteDevices() {
+        Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
+        devs.add(btreeFlushTarget.getDeviceHandle());
+        devs.add(bloomFilterFlushTarget.getDeviceHandle());
+        return devs;
+    }
+
+    @Override
+    public void perform() throws HyracksDataException, IndexException {
+        accessor.flush(this);
+    }
+
+    @Override
+    public ILSMIOOperationCallback getCallback() {
+        return callback;
+    }
+
+    public FileReference getBTreeFlushTarget() {
+        return btreeFlushTarget;
+    }
+
+    public FileReference getBloomFilterFlushTarget() {
+        return bloomFilterFlushTarget;
+    }
+
+    public ILSMIndexAccessorInternal getAccessor() {
+        return accessor;
+    }
+
+    public ILSMComponent getFlushingComponent() {
+        return flushingComponent;
+    }
+}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
index 2251a49..7b5e95d 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
@@ -1,24 +1,33 @@
 package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractImmutableLSMComponent;
 
 public class LSMBTreeImmutableComponent extends AbstractImmutableLSMComponent {
 
     private final BTree btree;
+    private final BloomFilter bloomFilter;
 
-    public LSMBTreeImmutableComponent(BTree btree) {
+    public LSMBTreeImmutableComponent(BTree btree, BloomFilter bloomFilter) {
         this.btree = btree;
+        this.bloomFilter = bloomFilter;
     }
 
     @Override
     public void destroy() throws HyracksDataException {
         btree.deactivate();
         btree.destroy();
+        bloomFilter.deactivate();
+        bloomFilter.destroy();
     }
 
     public BTree getBTree() {
         return btree;
     }
+
+    public BloomFilter getBloomFilter() {
+        return bloomFilter;
+    }
 }
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
index 696fc2c..998072f 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -25,14 +27,18 @@
 
 public class LSMBTreeImmutableComponentFactory implements ILSMComponentFactory {
     private final TreeIndexFactory<BTree> btreeFactory;
+    private final BloomFilterFactory bloomFilterFactory;
 
-    public LSMBTreeImmutableComponentFactory(TreeIndexFactory<BTree> btreeFactory) {
+    public LSMBTreeImmutableComponentFactory(TreeIndexFactory<BTree> btreeFactory, BloomFilterFactory bloomFilterFactory) {
         this.btreeFactory = btreeFactory;
+        this.bloomFilterFactory = bloomFilterFactory;
     }
 
     @Override
-    public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException {
-        return new LSMBTreeImmutableComponent(btreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()));
+    public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException,
+            HyracksDataException {
+        return new LSMBTreeImmutableComponent(btreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()),
+                bloomFilterFactory.createBloomFiltertInstance(cfr.getBloomFilterFileReference()));
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index a3a7097..ddfe365 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -35,15 +34,18 @@
     private final ILSMIndexAccessorInternal accessor;
     private final List<ILSMComponent> mergingComponents;
     private final ITreeIndexCursor cursor;
-    private final FileReference mergeTarget;
+    private final FileReference btreeMergeTarget;
+    private final FileReference bloomFilterMergeTarget;
     private final ILSMIOOperationCallback callback;
 
     public LSMBTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
-            ITreeIndexCursor cursor, FileReference mergeTarget, ILSMIOOperationCallback callback) {
+            ITreeIndexCursor cursor, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget,
+            ILSMIOOperationCallback callback) {
         this.accessor = accessor;
         this.mergingComponents = mergingComponents;
         this.cursor = cursor;
-        this.mergeTarget = mergeTarget;
+        this.btreeMergeTarget = btreeMergeTarget;
+        this.bloomFilterMergeTarget = bloomFilterMergeTarget;
         this.callback = callback;
     }
 
@@ -59,7 +61,10 @@
 
     @Override
     public Set<IODeviceHandle> getWriteDevices() {
-        return Collections.singleton(mergeTarget.getDeviceHandle());
+        Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
+        devs.add(btreeMergeTarget.getDeviceHandle());
+        devs.add(bloomFilterMergeTarget.getDeviceHandle());
+        return devs;
     }
 
     @Override
@@ -72,8 +77,12 @@
         return callback;
     }
 
-    public FileReference getMergeTarget() {
-        return mergeTarget;
+    public FileReference getBTreeMergeTarget() {
+        return btreeMergeTarget;
+    }
+
+    public FileReference getBloomFilterMergeTarget() {
+        return bloomFilterMergeTarget;
     }
 
     public ITreeIndexCursor getCursor() {
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
index 3706230..154ecf6 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
@@ -19,6 +19,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
@@ -29,6 +30,7 @@
 import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.freepage.LinkedListFreePageManagerFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeCopyTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
@@ -38,7 +40,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -80,13 +81,20 @@
                 typeTraits.length);
         TreeIndexFactory<BTree> bulkLoadBTreeFactory = new BTreeFactory(diskBufferCache, diskFileMapProvider,
                 freePageManagerFactory, interiorFrameFactory, insertLeafFrameFactory, cmpFactories, typeTraits.length);
-        ILSMIndexFileManager fileNameManager = new LSMIndexFileManager(ioManager, diskFileMapProvider, file,
+
+        int keyFields[] = new int[cmpFactories.length];
+        for (int i = 0; i < cmpFactories.length; i++) {
+            keyFields[i] = i;
+        }
+        BloomFilterFactory bloomFilterFactory = new BloomFilterFactory(diskBufferCache, diskFileMapProvider, keyFields);
+
+        ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, diskFileMapProvider, file,
                 diskBTreeFactory, startIODeviceIndex);
 
         LSMBTree lsmTree = new LSMBTree(memBufferCache, memFreePageManager, interiorFrameFactory,
                 insertLeafFrameFactory, deleteLeafFrameFactory, fileNameManager, diskBTreeFactory,
-                bulkLoadBTreeFactory, diskFileMapProvider, typeTraits.length, cmpFactories, mergePolicy,
-                opTrackerFactory, ioScheduler, ioOpCallbackProvider);
+                bulkLoadBTreeFactory, bloomFilterFactory, diskFileMapProvider, typeTraits.length, cmpFactories,
+                mergePolicy, opTrackerFactory, ioScheduler, ioOpCallbackProvider);
         return lsmTree;
     }
 }