merged hyracks_lsm_tree into branch -r2707:2751

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree_bloom_filter@2752 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index e55882a..56d7837 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -152,7 +152,7 @@
         IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, lcManagerProvider, btreeSplitProvider, typeTraits, comparatorFactories,
-                fieldPermutation, 0.7f, false, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+                fieldPermutation, 0.7f, false, 1000L, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
         JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
 
         // distribute the records from the datagen via hashing to the bulk load
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 3a4e59f..d7480ec 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -144,7 +144,7 @@
         IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, lcManagerProvider, btreeSplitProvider, secondaryTypeTraits, comparatorFactories,
-                fieldPermutation, 0.7f, false, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+                fieldPermutation, 0.7f, false, 1000L, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
         JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
 
         // connect the ops
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 739f107..7e69d4d 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -172,7 +172,7 @@
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
-                fieldPermutation, 0.7f, true, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+                fieldPermutation, 0.7f, true, 1000L, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -234,7 +234,7 @@
         int[] fieldPermutation = { 3, 0 };
         TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
-                secondaryComparatorFactories, fieldPermutation, 0.7f, true, dataflowHelperFactory,
+                secondaryComparatorFactories, fieldPermutation, 0.7f, true, 1000L, dataflowHelperFactory,
                 NoOpOperationCallbackFactory.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
 
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index 05b7a26..3a77b60 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -199,7 +199,7 @@
         int[] fieldPermutation = { 0, 1 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, fieldPermutation, 0.7f, true, btreeDataflowHelperFactory,
+                primaryComparatorFactories, fieldPermutation, 0.7f, true, 1000L, btreeDataflowHelperFactory,
                 NoOpOperationCallbackFactory.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
         return primaryBtreeBulkLoad;
@@ -273,7 +273,7 @@
 
     private IOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation) {
         LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
-                spec, fieldPermutation, true, storageManager, btreeFileSplitProvider, lcManagerProvider,
+                spec, fieldPermutation, true, 1000L, storageManager, btreeFileSplitProvider, lcManagerProvider,
                 tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
                 tokenizerFactory, invertedIndexDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 428b354..97f6908 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -223,7 +223,7 @@
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
         TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
-                fieldPermutation, 0.7f, false, btreeDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+                fieldPermutation, 0.7f, false, 1000L, btreeDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -279,7 +279,7 @@
         int[] fieldPermutation = { 6, 7, 8, 9, 0 };
         TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
-                secondaryComparatorFactories, fieldPermutation, 0.7f, false, rtreeDataflowHelperFactory,
+                secondaryComparatorFactories, fieldPermutation, 0.7f, false, 1000L, rtreeDataflowHelperFactory,
                 NoOpOperationCallbackFactory.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBulkLoad, NC1_ID);
 
diff --git a/hyracks-storage-am-bloomfilter/pom.xml b/hyracks-storage-am-bloomfilter/pom.xml
new file mode 100644
index 0000000..dab96f9
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/pom.xml
@@ -0,0 +1,42 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks</groupId>
+  <artifactId>hyracks-storage-am-bloomfilter</artifactId>
+  <version>0.2.2-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.2.2-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-storage-am-common</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>  	
+  	<dependency>
+  		<groupId>junit</groupId>
+  		<artifactId>junit</artifactId>
+  		<version>4.8.1</version>
+  		<type>jar</type>
+  		<scope>test</scope>
+  	</dependency>  	  		
+  </dependencies>
+</project>
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
new file mode 100644
index 0000000..a6a45de
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class BloomFilter {
+
+    private final static int METADATA_PAGE_ID = 0;
+    private final static int NUM_PAGES_OFFSET = 0; // 0
+    private final static int NUM_HASHES_USED_OFFSET = NUM_PAGES_OFFSET + 4; // 4
+    private final static int NUM_ELEMENTS_OFFSET = NUM_HASHES_USED_OFFSET + 4; // 8
+    private final static int NUM_BITS_OFFSET = NUM_ELEMENTS_OFFSET + 8; // 12
+
+    private final static int NUM_BITS_PER_ELEMENT = 10;
+
+    private final IBufferCache bufferCache;
+    private final IFileMapProvider fileMapProvider;
+    private final FileReference file;
+    private final int[] keyFields;
+    private int fileId = -1;
+    private boolean isActivated = false;
+
+    private int numPages;
+    private int numHashes;
+    private long numElements;
+    private long numBits;
+    private int numBitsPerPage;
+
+    private final ArrayList<ICachedPage> bloomFilterPages = new ArrayList<ICachedPage>();
+    private final static long SEED = 0L;
+
+    public BloomFilter(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file, int[] keyFields)
+            throws HyracksDataException {
+        this.bufferCache = bufferCache;
+        this.fileMapProvider = fileMapProvider;
+        this.file = file;
+        this.keyFields = keyFields;
+        numBitsPerPage = bufferCache.getPageSize() * Byte.SIZE;
+    }
+
+    public int getFileId() {
+        return fileId;
+    }
+
+    public FileReference getFileReference() {
+        return file;
+    }
+
+    public int getNumPages() throws HyracksDataException {
+        if (!isActivated) {
+            throw new HyracksDataException("The bloom filter is not activated.");
+        }
+        return numPages;
+    }
+
+    public long getNumElements() throws HyracksDataException {
+        if (!isActivated) {
+            throw new HyracksDataException("The bloom filter is not activated.");
+        }
+        return numElements;
+    }
+
+    public boolean contains(ITupleReference tuple, long[] hashes) {
+        MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
+        for (int i = 0; i < numHashes; ++i) {
+            long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
+
+            ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
+            int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
+            byte b = buffer.get(byteIndex);
+            int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
+
+            if (!((b & (1L << bitIndex)) != 0)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void prepareFile() throws HyracksDataException {
+        boolean fileIsMapped = false;
+        synchronized (fileMapProvider) {
+            fileIsMapped = fileMapProvider.isMapped(file);
+            if (!fileIsMapped) {
+                bufferCache.createFile(file);
+            }
+            fileId = fileMapProvider.lookupFileId(file);
+            try {
+                // Also creates the file if it doesn't exist yet.
+                bufferCache.openFile(fileId);
+            } catch (HyracksDataException e) {
+                // Revert state of buffer cache since file failed to open.
+                if (!fileIsMapped) {
+                    bufferCache.deleteFile(fileId, false);
+                }
+                throw e;
+            }
+        }
+    }
+
+    public synchronized void create() throws HyracksDataException {
+        if (isActivated) {
+            throw new HyracksDataException("Failed to create the bloom filter since it is activated.");
+        }
+        prepareFile();
+        ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), true);
+        metaPage.acquireWriteLatch();
+        metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, 0);
+        metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, 0);
+        metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, 0L);
+        metaPage.getBuffer().putLong(NUM_BITS_OFFSET, 0L);
+        metaPage.releaseWriteLatch();
+        bufferCache.unpin(metaPage);
+        bufferCache.closeFile(fileId);
+    }
+
+    public synchronized void activate() throws HyracksDataException {
+        if (isActivated) {
+            return;
+        }
+
+        prepareFile();
+        readBloomFilterMetaData();
+
+        int currentPageId = 1;
+        while (currentPageId <= numPages) {
+            ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false);
+            bloomFilterPages.add(page);
+            ++currentPageId;
+        }
+        isActivated = true;
+    }
+
+    private void readBloomFilterMetaData() throws HyracksDataException {
+        ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
+        metaPage.acquireReadLatch();
+        numPages = metaPage.getBuffer().getInt(NUM_PAGES_OFFSET);
+        numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
+        numElements = metaPage.getBuffer().getLong(NUM_ELEMENTS_OFFSET);
+        numBits = metaPage.getBuffer().getLong(NUM_BITS_OFFSET);
+        metaPage.releaseReadLatch();
+        bufferCache.unpin(metaPage);
+    }
+
+    public synchronized void deactivate() throws HyracksDataException {
+        if (!isActivated) {
+            return;
+        }
+
+        for (int i = 0; i < numPages; ++i) {
+            bufferCache.unpin(bloomFilterPages.get(i));
+        }
+        bloomFilterPages.clear();
+        bufferCache.closeFile(fileId);
+        isActivated = false;
+    }
+
+    public synchronized void destroy() throws HyracksDataException {
+        if (isActivated) {
+            throw new HyracksDataException("Failed to destroy the bloom filter since it is activated.");
+        }
+
+        file.delete();
+        if (fileId == -1) {
+            return;
+        }
+        bufferCache.deleteFile(fileId, false);
+        fileId = -1;
+    }
+
+    public IIndexBulkLoader createBuilder(long numElements, int numHashes) throws HyracksDataException {
+        return new BloomFilterBuilder(numElements, numHashes);
+    }
+
+    public class BloomFilterBuilder implements IIndexBulkLoader {
+        private final long[] hashes = new long[2];
+
+        private final long numElements;
+        private final int numHashes;
+        private final long numBits;
+        private final int numPages;
+
+        public BloomFilterBuilder(long numElements, int numHashes) throws HyracksDataException {
+            if (!isActivated) {
+                throw new HyracksDataException("Failed to create the bloom filter builder since it is not activated.");
+            }
+            this.numElements = numElements;
+            this.numHashes = numHashes;
+            numBits = numElements * NUM_BITS_PER_ELEMENT;
+            long tmp = (long) Math.ceil(numBits / (double) numBitsPerPage);
+            if (tmp > Integer.MAX_VALUE) {
+                throw new HyracksDataException("Cannot create a bloom filter with his huge number of pages.");
+            }
+            numPages = (int) tmp;
+            persistBloomFilterMetaData();
+            readBloomFilterMetaData();
+
+            int currentPageId = 1;
+            while (currentPageId <= numPages) {
+                ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), true);
+                page.acquireWriteLatch();
+                bloomFilterPages.add(page);
+                ++currentPageId;
+            }
+        }
+
+        private void persistBloomFilterMetaData() throws HyracksDataException {
+            ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
+            metaPage.acquireWriteLatch();
+            metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, numPages);
+            metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
+            metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, numElements);
+            metaPage.getBuffer().putLong(NUM_BITS_OFFSET, numBits);
+            metaPage.releaseWriteLatch();
+            bufferCache.unpin(metaPage);
+        }
+
+        @Override
+        public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
+            MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
+            for (int i = 0; i < numHashes; ++i) {
+                long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
+
+                ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
+                int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
+                byte b = buffer.get(byteIndex);
+                int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
+                b = (byte) (b | (1 << bitIndex));
+
+                buffer.put(byteIndex, b);
+            }
+        }
+
+        @Override
+        public void end() throws HyracksDataException, IndexException {
+            for (int i = 0; i < numPages; ++i) {
+                ICachedPage page = bloomFilterPages.get(i);
+                page.releaseWriteLatch();
+            }
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterFactory.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterFactory.java
new file mode 100644
index 0000000..8bd419a
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class BloomFilterFactory {
+    private final IBufferCache bufferCache;
+    private final IFileMapProvider fileMapProvider;
+    private final int[] keyFields;
+
+    public BloomFilterFactory(IBufferCache bufferCache, IFileMapProvider fileMapProvider, int[] keyFields) {
+        this.bufferCache = bufferCache;
+        this.fileMapProvider = fileMapProvider;
+        this.keyFields = keyFields;
+    }
+
+    public BloomFilter createBloomFiltertInstance(FileReference file) throws HyracksDataException {
+        return new BloomFilter(bufferCache, fileMapProvider, file, keyFields);
+    }
+}
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterSpecification.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterSpecification.java
new file mode 100644
index 0000000..c396f9b
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterSpecification.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+public final class BloomFilterSpecification {
+    private final long numElements;
+    private final int numHashes;
+
+    public BloomFilterSpecification(long numElements, int numHashes) {
+        this.numElements = numElements;
+        this.numHashes = numHashes;
+    }
+
+    public long getNumElements() {
+        return numElements;
+    }
+
+    public int getNumHashes() {
+        return numHashes;
+    }
+}
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/MurmurHash128Bit.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/MurmurHash128Bit.java
new file mode 100644
index 0000000..b1ec77d
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/MurmurHash128Bit.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * The idea of this class is borrowed from http://murmurhash.googlepages.com/ and cassandra source code.3
+ * We changed the hash function to operate on ITupleReference instead of a byte array.
+ **/
+public class MurmurHash128Bit {
+
+    private final static int DUMMY_FIELD = 0;
+
+    public static long rotl64(long v, int n) {
+        return ((v << n) | (v >>> (64 - n)));
+    }
+
+    public static long fmix(long k) {
+        k ^= k >>> 33;
+        k *= 0xff51afd7ed558ccdL;
+        k ^= k >>> 33;
+        k *= 0xc4ceb9fe1a85ec53L;
+        k ^= k >>> 33;
+
+        return k;
+    }
+
+    public static void hash3_x64_128(ITupleReference tuple, int[] keyFields, long seed, long[] hashes) {
+        int length = 0;
+        for (int i = 0; i < keyFields.length; ++i) {
+            length += tuple.getFieldLength(keyFields[i]);
+        }
+        final int nblocks = length >> 4; // Process as 128-bit blocks.
+
+        long h1 = seed;
+        long h2 = seed;
+
+        long c1 = 0x87c37b91114253d5L;
+        long c2 = 0x4cf5ad432745937fL;
+
+        //----------
+        // body
+
+        int currentFieldIndex = 0;
+        int bytePos = 0;
+        for (int i = 0; i < nblocks; ++i) {
+
+            long k1 = 0L;
+            for (int j = 0; j < 8; ++j) {
+                k1 += (((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos] & 0xff) << (j << 3));
+                ++bytePos;
+                if (tuple.getFieldLength(keyFields[currentFieldIndex]) == bytePos) {
+                    ++currentFieldIndex;
+                    bytePos = 0;
+                }
+            }
+            long k2 = 0L;
+            for (int j = 0; j < 8; ++j) {
+                k2 += (((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos] & 0xff) << (j << 3));
+                ++bytePos;
+                if (tuple.getFieldLength(keyFields[currentFieldIndex]) == bytePos) {
+                    ++currentFieldIndex;
+                    bytePos = 0;
+                }
+            }
+
+            k1 *= c1;
+            k1 = rotl64(k1, 31);
+            k1 *= c2;
+            h1 ^= k1;
+
+            h1 = rotl64(h1, 27);
+            h1 += h2;
+            h1 = h1 * 5 + 0x52dce729;
+
+            k2 *= c2;
+            k2 = rotl64(k2, 33);
+            k2 *= c1;
+            h2 ^= k2;
+
+            h2 = rotl64(h2, 31);
+            h2 += h1;
+            h2 = h2 * 5 + 0x38495ab5;
+        }
+
+        //----------
+        // tail
+
+        long k1 = 0L;
+        long k2 = 0L;
+
+        currentFieldIndex = keyFields.length - 1;
+        bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+        switch (length & 15) {
+            case 15:
+                k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 48;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 14:
+                k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 40;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 13:
+                k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 32;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 12:
+                k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 24;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 11:
+                k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 16;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 10:
+                k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 8;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 9:
+                k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]);
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+                k2 *= c2;
+                k2 = rotl64(k2, 33);
+                k2 *= c1;
+                h2 ^= k2;
+
+            case 8:
+                k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 56;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 7:
+                k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 48;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 6:
+                k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 40;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 5:
+                k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 32;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 4:
+                k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 24;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 3:
+                k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 16;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 2:
+                k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]) << 8;
+                --bytePos;
+                if (bytePos == -1) {
+                    --currentFieldIndex;
+                    bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+                }
+            case 1:
+                k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+                        + bytePos]);
+                k1 *= c1;
+                k1 = rotl64(k1, 31);
+                k1 *= c2;
+                h1 ^= k1;
+        };
+
+        //----------
+        // finalization
+
+        h1 ^= length;
+        h2 ^= length;
+
+        h1 += h2;
+        h2 += h1;
+
+        h1 = fmix(h1);
+        h2 = fmix(h2);
+
+        h1 += h2;
+        h2 += h1;
+
+        hashes[0] = h1;
+        hashes[1] = h2;
+    }
+
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index d825b89..86bc32a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -910,7 +910,8 @@
     }
 
     @Override
-    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+            throws TreeIndexException {
         try {
             return new BTreeBulkLoader(fillFactor, verifyInput);
         } catch (HyracksDataException e) {
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java
index d6d74ee..1557c75 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java
@@ -22,69 +22,62 @@
  * This interface describes the operations common to all indexes. Indexes
  * implementing this interface can easily reuse existing index operators for
  * dataflow. Users must perform operations on an via an {@link IIndexAccessor}.
- * 
- * During dataflow, the lifecycle of IIndexes are handled through an 
- * {@link IIndexLifecycleManager}.
+ * During dataflow, the lifecycle of IIndexes are handled through an {@link IIndexLifecycleManager}.
  */
 public interface IIndex {
 
     /**
-     * Initializes the persistent state of an index. 
-     * 
+     * Initializes the persistent state of an index.
      * An index cannot be created if it is in the activated state.
      * Calling create on an index that is deactivated has the effect of clearing the index.
      * 
-     * @throws HyracksDataException 
-     *          if there is an error in the BufferCache while (un)pinning pages, (un)latching pages, 
-     *          creating files, or deleting files
-     *          
-     *          if the index is in the activated state
+     * @throws HyracksDataException
+     *             if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
+     *             creating files, or deleting files
+     *             if the index is in the activated state
      */
     public void create() throws HyracksDataException;
 
     /**
-     * Initializes the index's operational state. An index in the activated state may perform 
+     * Initializes the index's operational state. An index in the activated state may perform
      * operations via an {@link IIndexAccessor}.
      * 
      * @throws HyracksDataException
-     *          if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages, 
-     *          creating files, or deleting files
+     *             if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
+     *             creating files, or deleting files
      */
     public void activate() throws HyracksDataException;
 
     /**
-     * Resets the operational state of the index. Calling clear has the same logical effect 
-     * as calling deactivate(), destroy(), create(), then activate(), but not necessarily the 
+     * Resets the operational state of the index. Calling clear has the same logical effect
+     * as calling deactivate(), destroy(), create(), then activate(), but not necessarily the
      * same physical effect.
      * 
      * @throws HyracksDataException
-     *          if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages, 
-     *          creating files, or deleting files
-     *          
-     *          if the index is not in the activated state
+     *             if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
+     *             creating files, or deleting files
+     *             if the index is not in the activated state
      */
     public void clear() throws HyracksDataException;
 
     /**
-     * Deinitializes the index's operational state. An index in the deactivated state may not 
+     * Deinitializes the index's operational state. An index in the deactivated state may not
      * perform operations.
      * 
      * @throws HyracksDataException
-     *          if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages, 
-     *          creating files, or deleting files
+     *             if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
+     *             creating files, or deleting files
      */
     public void deactivate() throws HyracksDataException;
 
     /**
-     * Removes the persistent state of an index. 
-     * 
+     * Removes the persistent state of an index.
      * An index cannot be destroyed if it is in the activated state.
      * 
-     * @throws HyracksDataException 
-     *          if there is an error in the BufferCache while (un)pinning pages, (un)latching pages, 
-     *          creating files, or deleting files
-     *          
-     *          if the index is already activated
+     * @throws HyracksDataException
+     *             if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
+     *             creating files, or deleting files
+     *             if the index is already activated
      */
     public void destroy() throws HyracksDataException;
 
@@ -94,8 +87,10 @@
      * on the same {@link IIndex}.
      * 
      * @returns IIndexAccessor an accessor for this {@link IIndex}
-     * @param modificationCallback the callback to be used for modification operations
-     * @param searchCallback the callback to be used for search operations
+     * @param modificationCallback
+     *            the callback to be used for modification operations
+     * @param searchCallback
+     *            the callback to be used for search operations
      */
     public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
             ISearchOperationCallback searchCallback);
@@ -105,7 +100,7 @@
      * An assertion error is thrown if validation fails.
      * 
      * @throws HyracksDataException
-     *          if there is an error performing validation
+     *             if there is an error performing validation
      */
     public void validate() throws HyracksDataException;
 
@@ -124,5 +119,6 @@
      * @param verifyInput
      * @throws IndexException
      */
-    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException;
+    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+            throws IndexException;
 }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index de4e627..1b6271d 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -33,6 +33,7 @@
     private final IHyracksTaskContext ctx;
     private final float fillFactor;
     private final boolean verifyInput;
+    private final long numElementsHint;
     private final IIndexDataflowHelper indexHelper;
     private FrameTupleAccessor accessor;
     private IIndex index;
@@ -41,12 +42,14 @@
     private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
 
     public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
-            int[] fieldPermutation, float fillFactor, boolean verifyInput, IRecordDescriptorProvider recordDescProvider) {
+            int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+            IRecordDescriptorProvider recordDescProvider) {
         this.opDesc = opDesc;
         this.ctx = ctx;
         this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
         this.fillFactor = fillFactor;
         this.verifyInput = verifyInput;
+        this.numElementsHint = numElementsHint;
         this.recDescProvider = recordDescProvider;
         tuple.setFieldPermutation(fieldPermutation);
     }
@@ -58,7 +61,7 @@
         indexHelper.open();
         index = indexHelper.getIndexInstance();
         try {
-            bulkLoader = index.createBulkLoader(fillFactor, verifyInput);
+            bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint);
         } catch (Exception e) {
             indexHelper.close();
             throw new HyracksDataException(e);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
index fea6463..c58dee5 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
@@ -35,12 +35,13 @@
     private final int[] fieldPermutation;
     private final float fillFactor;
     private final boolean verifyInput;
+    private final long numElementsHint;
 
     public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec,
             IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation, float fillFactor,
-            boolean verifyInput, IIndexDataflowHelperFactory dataflowHelperFactory,
+            boolean verifyInput, long numElementsHint, IIndexDataflowHelperFactory dataflowHelperFactory,
             IModificationOperationCallbackFactory modificationOpCallbackFactory) {
         super(spec, 1, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, dataflowHelperFactory, null, false, NoOpLocalResourceFactoryProvider.INSTANCE,
@@ -48,12 +49,13 @@
         this.fieldPermutation = fieldPermutation;
         this.fillFactor = fillFactor;
         this.verifyInput = verifyInput;
+        this.numElementsHint = numElementsHint;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new IndexBulkLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor, verifyInput,
-                recordDescProvider);
+                numElementsHint, recordDescProvider);
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-btree/pom.xml b/hyracks-storage-am-lsm-btree/pom.xml
index 17f3714..afef819 100644
--- a/hyracks-storage-am-lsm-btree/pom.xml
+++ b/hyracks-storage-am-lsm-btree/pom.xml
@@ -33,6 +33,13 @@
   	</dependency> 
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-storage-am-bloomfilter</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency> 
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-am-lsm-common</artifactId>
   		<version>0.2.2-SNAPSHOT</version>
   		<type>jar</type>
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 4cd4974..b557125 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -23,8 +23,12 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -64,7 +68,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
@@ -86,13 +89,16 @@
     private final ITreeIndexFrameFactory deleteLeafFrameFactory;
     private final IBinaryComparatorFactory[] cmpFactories;
 
+    private final int numHashes = 10;
+
     public LSMBTree(IInMemoryBufferCache memBufferCache, IInMemoryFreePageManager memFreePageManager,
             ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
             ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMIndexFileManager fileManager,
             TreeIndexFactory<BTree> diskBTreeFactory, TreeIndexFactory<BTree> bulkLoadBTreeFactory,
-            IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
-            ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
-            ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
+            BloomFilterFactory bloomFilterFactory, IFileMapProvider diskFileMapProvider, int fieldCount,
+            IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy,
+            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
+            ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
         super(memFreePageManager, diskBTreeFactory.getBufferCache(), fileManager, diskFileMapProvider, mergePolicy,
                 opTrackerFactory, ioScheduler, ioOpCallbackProvider);
         mutableComponent = new LSMBTreeMutableComponent(new BTree(memBufferCache,
@@ -102,8 +108,8 @@
         this.insertLeafFrameFactory = insertLeafFrameFactory;
         this.deleteLeafFrameFactory = deleteLeafFrameFactory;
         this.cmpFactories = cmpFactories;
-        componentFactory = new LSMBTreeImmutableComponentFactory(diskBTreeFactory);
-        bulkLoadComponentFactory = new LSMBTreeImmutableComponentFactory(bulkLoadBTreeFactory);
+        componentFactory = new LSMBTreeImmutableComponentFactory(diskBTreeFactory, bloomFilterFactory);
+        bulkLoadComponentFactory = new LSMBTreeImmutableComponentFactory(bulkLoadBTreeFactory, bloomFilterFactory);
     }
 
     @Override
@@ -135,14 +141,15 @@
             throw new HyracksDataException(e);
         }
         for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
-            LSMBTreeImmutableComponent btree;
+            LSMBTreeImmutableComponent component;
             try {
-                btree = createDiskComponent(componentFactory, lsmComonentFileReference.getInsertIndexFileReference(),
-                        false);
+                component = createDiskComponent(componentFactory,
+                        lsmComonentFileReference.getInsertIndexFileReference(),
+                        lsmComonentFileReference.getBloomFilterFileReference(), 0L, 0, false);
             } catch (IndexException e) {
                 throw new HyracksDataException(e);
             }
-            immutableComponents.add(btree);
+            immutableComponents.add(component);
         }
         isActivated = true;
     }
@@ -167,8 +174,11 @@
 
         List<ILSMComponent> immutableComponents = componentsRef.get();
         for (ILSMComponent c : immutableComponents) {
-            BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
+            LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+            BTree btree = component.getBTree();
+            BloomFilter bloomFilter = component.getBloomFilter();
             btree.deactivate();
+            bloomFilter.deactivate();
         }
         mutableComponent.getBTree().deactivate();
         mutableComponent.getBTree().destroy();
@@ -189,8 +199,9 @@
 
         List<ILSMComponent> immutableComponents = componentsRef.get();
         for (ILSMComponent c : immutableComponents) {
-            BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
-            btree.destroy();
+            LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+            component.getBTree().destroy();
+            component.getBloomFilter().destroy();
         }
         mutableComponent.getBTree().destroy();
         fileManager.deleteDirs();
@@ -205,9 +216,11 @@
         List<ILSMComponent> immutableComponents = componentsRef.get();
         mutableComponent.getBTree().clear();
         for (ILSMComponent c : immutableComponents) {
-            BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
-            btree.deactivate();
-            btree.destroy();
+            LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+            component.getBloomFilter().deactivate();
+            component.getBTree().deactivate();
+            component.getBloomFilter().destroy();
+            component.getBTree().destroy();
         }
         immutableComponents.clear();
     }
@@ -346,28 +359,48 @@
         opCtx.setOperation(IndexOperation.FLUSH);
         opCtx.getComponentHolder().add(flushingComponent);
         ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
-        ioScheduler.scheduleOperation(new LSMFlushOperation(flushAccessor, flushingComponent, componentFileRefs
-                .getInsertIndexFileReference(), callback));
+        ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent, componentFileRefs
+                .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback));
     }
 
     @Override
     public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
-        LSMFlushOperation flushOp = (LSMFlushOperation) operation;
+        LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation;
         LSMBTreeMutableComponent flushingComponent = (LSMBTreeMutableComponent) flushOp.getFlushingComponent();
         IIndexAccessor accessor = flushingComponent.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
-        IIndexCursor scanCursor = accessor.createSearchCursor();
+
         RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
+        IIndexCursor countingCursor = ((BTreeAccessor) accessor).createCountingSearchCursor();
+        accessor.search(countingCursor, nullPred);
+        long numElements = 0L;
+        try {
+            while (countingCursor.hasNext()) {
+                countingCursor.next();
+                ITupleReference countTuple = countingCursor.getTuple();
+                numElements = IntegerSerializerDeserializer.getInt(countTuple.getFieldData(0),
+                        countTuple.getFieldStart(0));
+            }
+        } finally {
+            countingCursor.close();
+        }
+
+        LSMBTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getBTreeFlushTarget(),
+                flushOp.getBloomFilterFlushTarget(), numElements, numHashes, true);
+        IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false, numElements);
+        IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numElements, numHashes);
+
+        IIndexCursor scanCursor = accessor.createSearchCursor();
         accessor.search(scanCursor, nullPred);
-        LSMBTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getFlushTarget(), true);
-        IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false);
         try {
             while (scanCursor.hasNext()) {
                 scanCursor.next();
+                builder.add(scanCursor.getTuple());
                 bulkLoader.add(scanCursor.getTuple());
             }
         } finally {
             scanCursor.close();
+            builder.end();
         }
         bulkLoader.end();
         return component;
@@ -392,7 +425,7 @@
                 .getName(), lastFile.getFile().getName());
         ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
-                .getInsertIndexFileReference(), callback));
+                .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
     }
 
     @Override
@@ -401,71 +434,103 @@
         LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
         ITreeIndexCursor cursor = mergeOp.getCursor();
         mergedComponents.addAll(mergeOp.getMergingComponents());
-        LSMBTreeImmutableComponent mergedBTree = createDiskComponent(componentFactory, mergeOp.getMergeTarget(), true);
-        IIndexBulkLoader bulkLoader = mergedBTree.getBTree().createBulkLoader(1.0f, false);
+
+        long numElements = 0L;
+        for (int i = 0; i < mergedComponents.size(); ++i) {
+            numElements += ((LSMBTreeImmutableComponent) mergedComponents.get(i)).getBloomFilter().getNumElements();
+        }
+
+        LSMBTreeImmutableComponent mergedComponent = createDiskComponent(componentFactory,
+                mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), numElements, numHashes, true);
+
+        IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements);
+        IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements, numHashes);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
                 ITupleReference frameTuple = cursor.getTuple();
+                builder.add(frameTuple);
                 bulkLoader.add(frameTuple);
             }
         } finally {
             cursor.close();
+            builder.end();
         }
         bulkLoader.end();
-        return mergedBTree;
+        return mergedComponent;
     }
 
     private LSMBTreeImmutableComponent createDiskComponent(LSMBTreeImmutableComponentFactory factory,
-            FileReference fileRef, boolean createComponent) throws HyracksDataException, IndexException {
+            FileReference btreeFileRef, FileReference bloomFilterFileRef, long numElements, int numHashes,
+            boolean createComponent) throws HyracksDataException, IndexException {
         // Create new BTree instance.
         LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) factory
-                .createLSMComponentInstance(new LSMComponentFileReferences(fileRef, null));
+                .createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
         if (createComponent) {
             component.getBTree().create();
+            component.getBloomFilter().create();
         }
         // BTree will be closed during cleanup of merge().
         component.getBTree().activate();
+        component.getBloomFilter().activate();
         return component;
     }
 
     @Override
-    public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput) throws TreeIndexException {
-        return new LSMBTreeBulkLoader(fillLevel, verifyInput);
+    public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+            throws TreeIndexException {
+        try {
+            return new LSMBTreeBulkLoader(fillLevel, verifyInput, numElementsHint);
+        } catch (HyracksDataException e) {
+            throw new TreeIndexException(e);
+        }
     }
 
-    private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
+    private ILSMComponent createBulkLoadTarget(long numElementsHint) throws HyracksDataException, IndexException {
         LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
-        return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(), true);
+        return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(),
+                componentFileRefs.getBloomFilterFileReference(), numElementsHint, numHashes, true);
     }
 
     @Override
     public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
-        BTree btree = ((LSMBTreeImmutableComponent) lsmComponent).getBTree();
-        forceFlushDirtyPages(btree);
-        markAsValidInternal(btree);
+        // The order of forcing the dirty page to be flushed is critical. The bloom filter must be always done first.
+        LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) lsmComponent;
+        int fileId = component.getBloomFilter().getFileId();
+        IBufferCache bufferCache = component.getBTree().getBufferCache();
+        int startPage = 0;
+        int maxPage = component.getBloomFilter().getNumPages();
+        forceFlushDirtyPages(bufferCache, fileId, startPage, maxPage);
+        forceFlushDirtyPages(component.getBTree());
+        markAsValidInternal(component.getBTree());
     }
 
     public class LSMBTreeBulkLoader implements IIndexBulkLoader {
         private final ILSMComponent component;
         private final BTreeBulkLoader bulkLoader;
+        private final IIndexBulkLoader builder;
+        private boolean endCalledBasedOnFailure = false;
 
-        public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+        public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+                throws TreeIndexException, HyracksDataException {
             try {
-                component = createBulkLoadTarget();
+                component = createBulkLoadTarget(numElementsHint);
             } catch (HyracksDataException e) {
                 throw new TreeIndexException(e);
             } catch (IndexException e) {
                 throw new TreeIndexException(e);
             }
             bulkLoader = (BTreeBulkLoader) ((LSMBTreeImmutableComponent) component).getBTree().createBulkLoader(
-                    fillFactor, verifyInput);
+                    fillFactor, verifyInput, numElementsHint);
+            builder = ((LSMBTreeImmutableComponent) component).getBloomFilter().createBuilder(numElementsHint,
+                    numHashes);
         }
 
         @Override
         public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
             try {
                 bulkLoader.add(tuple);
+                builder.add(tuple);
             } catch (IndexException e) {
                 handleException();
                 throw e;
@@ -478,14 +543,21 @@
             }
         }
 
-        protected void handleException() throws HyracksDataException {
+        protected void handleException() throws HyracksDataException, IndexException {
             ((LSMBTreeImmutableComponent) component).getBTree().deactivate();
+            if (!endCalledBasedOnFailure) {
+                builder.end();
+            }
+            ((LSMBTreeImmutableComponent) component).getBloomFilter().deactivate();
             ((LSMBTreeImmutableComponent) component).getBTree().destroy();
+            ((LSMBTreeImmutableComponent) component).getBloomFilter().destroy();
         }
 
         @Override
         public void end() throws HyracksDataException, IndexException {
             bulkLoader.end();
+            builder.end();
+            endCalledBasedOnFailure = true;
             lsmHarness.addBulkLoadedComponent(component);
         }
 
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
new file mode 100644
index 0000000..e1dbc9e
--- /dev/null
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class LSMBTreeFileManager extends AbstractLSMIndexFileManager {
+    private static final String BTREE_STRING = "b";
+    private static final String BLOOM_FILTER_STRING = "f";
+
+    private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
+
+    public LSMBTreeFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
+            TreeIndexFactory<? extends ITreeIndex> btreeFactory, int startIODeviceIndex) {
+        super(ioManager, fileMapProvider, file, null, startIODeviceIndex);
+        this.btreeFactory = btreeFactory;
+    }
+
+    protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
+            TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
+            throws HyracksDataException, IndexException {
+        File dir = new File(dev.getPath(), baseDir);
+        String[] files = dir.list(filter);
+        for (String fileName : files) {
+            File file = new File(dir.getPath() + File.separator + fileName);
+            FileReference fileRef = new FileReference(file);
+            if (treeFactory == null || isValidTreeIndex(treeFactory.createIndexInstance(fileRef))) {
+                allFiles.add(new ComparableFileName(fileRef));
+            } else {
+                file.delete();
+            }
+        }
+    }
+
+    @Override
+    public LSMComponentFileReferences getRelFlushFileReference() {
+        Date date = new Date();
+        String ts = formatter.format(date);
+        String baseName = baseDir + ts + SPLIT_STRING + ts;
+        // Begin timestamp and end timestamp are identical since it is a flush
+        return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + BTREE_STRING), null,
+                createFlushFile(baseName + SPLIT_STRING + BLOOM_FILTER_STRING));
+    }
+
+    @Override
+    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
+            throws HyracksDataException {
+        String[] firstTimestampRange = firstFileName.split(SPLIT_STRING);
+        String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
+
+        String baseName = baseDir + firstTimestampRange[0] + SPLIT_STRING + lastTimestampRange[1];
+        // Get the range of timestamps by taking the earliest and the latest timestamps
+        return new LSMComponentFileReferences(createMergeFile(baseName + SPLIT_STRING + BTREE_STRING), null,
+                createMergeFile(baseName + SPLIT_STRING + BLOOM_FILTER_STRING));
+    }
+
+    private static FilenameFilter btreeFilter = new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+            return !name.startsWith(".") && name.endsWith(BTREE_STRING);
+        }
+    };
+
+    private static FilenameFilter bloomFilterFilter = new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+            return !name.startsWith(".") && name.endsWith(BLOOM_FILTER_STRING);
+        }
+    };
+
+    @Override
+    public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException, IndexException {
+        List<LSMComponentFileReferences> validFiles = new ArrayList<LSMComponentFileReferences>();
+        ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<ComparableFileName>();
+        ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<ComparableFileName>();
+
+        // Gather files from all IODeviceHandles.
+        for (IODeviceHandle dev : ioManager.getIODevices()) {
+            cleanupAndGetValidFilesInternal(dev, bloomFilterFilter, null, allBloomFilterFiles);
+            HashSet<String> bloomFilterFilesSet = new HashSet<String>();
+            for (ComparableFileName cmpFileName : allBloomFilterFiles) {
+                int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+                bloomFilterFilesSet.add(cmpFileName.fileName.substring(0, index));
+            }
+            // List of valid BTree files that may or may not have a bloom filter buddy. Will check for buddies below.
+            ArrayList<ComparableFileName> tmpAllBTreeFiles = new ArrayList<ComparableFileName>();
+            cleanupAndGetValidFilesInternal(dev, btreeFilter, btreeFactory, tmpAllBTreeFiles);
+            // Look for buddy bloom filters for all valid BTrees. 
+            // If no buddy is found, delete the file, otherwise add the BTree to allBTreeFiles. 
+            for (ComparableFileName cmpFileName : tmpAllBTreeFiles) {
+                int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+                String file = cmpFileName.fileName.substring(0, index);
+                if (bloomFilterFilesSet.contains(file)) {
+                    allBTreeFiles.add(cmpFileName);
+                } else {
+                    // Couldn't find the corresponding bloom filter file; thus, delete
+                    // the BTree file.
+                    File invalidBTreeFile = new File(cmpFileName.fullPath);
+                    invalidBTreeFile.delete();
+                }
+            }
+        }
+        // Sanity check.
+        if (allBTreeFiles.size() != allBloomFilterFiles.size()) {
+            throw new HyracksDataException(
+                    "Unequal number of valid BTree and bloom filter files found. Aborting cleanup.");
+        }
+
+        // Trivial cases.
+        if (allBTreeFiles.isEmpty() || allBloomFilterFiles.isEmpty()) {
+            return validFiles;
+        }
+
+        if (allBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) {
+            validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef, null, allBloomFilterFiles
+                    .get(0).fileRef));
+            return validFiles;
+        }
+
+        // Sorts files names from earliest to latest timestamp.
+        Collections.sort(allBTreeFiles);
+        Collections.sort(allBloomFilterFiles);
+
+        List<ComparableFileName> validComparableBTreeFiles = new ArrayList<ComparableFileName>();
+        ComparableFileName lastBTree = allBTreeFiles.get(0);
+        validComparableBTreeFiles.add(lastBTree);
+
+        List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<ComparableFileName>();
+        ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+        validComparableBloomFilterFiles.add(lastBloomFilter);
+
+        for (int i = 1; i < allBTreeFiles.size(); i++) {
+            ComparableFileName currentBTree = allBTreeFiles.get(i);
+            ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
+            // Current start timestamp is greater than last stop timestamp.
+            if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
+                    && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+                validComparableBTreeFiles.add(currentBTree);
+                validComparableBloomFilterFiles.add(currentBloomFilter);
+                lastBTree = currentBTree;
+                lastBloomFilter = currentBloomFilter;
+            } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
+                    && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
+                    && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
+                    && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
+                // Invalid files are completely contained in last interval.
+                File invalidBTreeFile = new File(currentBTree.fullPath);
+                invalidBTreeFile.delete();
+                File invalidBloomFilterFile = new File(currentBloomFilter.fullPath);
+                invalidBloomFilterFile.delete();
+            } else {
+                // This scenario should not be possible.
+                throw new HyracksDataException("Found LSM files with overlapping but not contained timetamp intervals.");
+            }
+        }
+
+        // Sort valid files in reverse lexicographical order, such that newer
+        // files come first.
+        Collections.sort(validComparableBTreeFiles, recencyCmp);
+        Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+
+        Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
+        Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+        while (btreeFileIter.hasNext() && bloomFilterFileIter.hasNext()) {
+            ComparableFileName cmpBTreeFileName = btreeFileIter.next();
+            ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
+            validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, null,
+                    cmpBloomFilterFileName.fileRef));
+        }
+
+        return validFiles;
+    }
+}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
new file mode 100644
index 0000000..dfda07b
--- /dev/null
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -0,0 +1,71 @@
+package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
+
+public class LSMBTreeFlushOperation implements ILSMIOOperation {
+
+    private final ILSMIndexAccessorInternal accessor;
+    private final ILSMComponent flushingComponent;
+    private final FileReference btreeFlushTarget;
+    private final FileReference bloomFilterFlushTarget;
+    private final ILSMIOOperationCallback callback;
+
+    public LSMBTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
+            FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+        this.accessor = accessor;
+        this.flushingComponent = flushingComponent;
+        this.btreeFlushTarget = btreeFlushTarget;
+        this.bloomFilterFlushTarget = bloomFilterFlushTarget;
+        this.callback = callback;
+    }
+
+    @Override
+    public Set<IODeviceHandle> getReadDevices() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<IODeviceHandle> getWriteDevices() {
+        Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
+        devs.add(btreeFlushTarget.getDeviceHandle());
+        devs.add(bloomFilterFlushTarget.getDeviceHandle());
+        return devs;
+    }
+
+    @Override
+    public void perform() throws HyracksDataException, IndexException {
+        accessor.flush(this);
+    }
+
+    @Override
+    public ILSMIOOperationCallback getCallback() {
+        return callback;
+    }
+
+    public FileReference getBTreeFlushTarget() {
+        return btreeFlushTarget;
+    }
+
+    public FileReference getBloomFilterFlushTarget() {
+        return bloomFilterFlushTarget;
+    }
+
+    public ILSMIndexAccessorInternal getAccessor() {
+        return accessor;
+    }
+
+    public ILSMComponent getFlushingComponent() {
+        return flushingComponent;
+    }
+}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
index 2251a49..7b5e95d 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
@@ -1,24 +1,33 @@
 package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractImmutableLSMComponent;
 
 public class LSMBTreeImmutableComponent extends AbstractImmutableLSMComponent {
 
     private final BTree btree;
+    private final BloomFilter bloomFilter;
 
-    public LSMBTreeImmutableComponent(BTree btree) {
+    public LSMBTreeImmutableComponent(BTree btree, BloomFilter bloomFilter) {
         this.btree = btree;
+        this.bloomFilter = bloomFilter;
     }
 
     @Override
     public void destroy() throws HyracksDataException {
         btree.deactivate();
         btree.destroy();
+        bloomFilter.deactivate();
+        bloomFilter.destroy();
     }
 
     public BTree getBTree() {
         return btree;
     }
+
+    public BloomFilter getBloomFilter() {
+        return bloomFilter;
+    }
 }
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
index 696fc2c..998072f 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -25,14 +27,18 @@
 
 public class LSMBTreeImmutableComponentFactory implements ILSMComponentFactory {
     private final TreeIndexFactory<BTree> btreeFactory;
+    private final BloomFilterFactory bloomFilterFactory;
 
-    public LSMBTreeImmutableComponentFactory(TreeIndexFactory<BTree> btreeFactory) {
+    public LSMBTreeImmutableComponentFactory(TreeIndexFactory<BTree> btreeFactory, BloomFilterFactory bloomFilterFactory) {
         this.btreeFactory = btreeFactory;
+        this.bloomFilterFactory = bloomFilterFactory;
     }
 
     @Override
-    public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException {
-        return new LSMBTreeImmutableComponent(btreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()));
+    public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException,
+            HyracksDataException {
+        return new LSMBTreeImmutableComponent(btreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()),
+                bloomFilterFactory.createBloomFiltertInstance(cfr.getBloomFilterFileReference()));
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index a3a7097..ddfe365 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -35,15 +34,18 @@
     private final ILSMIndexAccessorInternal accessor;
     private final List<ILSMComponent> mergingComponents;
     private final ITreeIndexCursor cursor;
-    private final FileReference mergeTarget;
+    private final FileReference btreeMergeTarget;
+    private final FileReference bloomFilterMergeTarget;
     private final ILSMIOOperationCallback callback;
 
     public LSMBTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
-            ITreeIndexCursor cursor, FileReference mergeTarget, ILSMIOOperationCallback callback) {
+            ITreeIndexCursor cursor, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget,
+            ILSMIOOperationCallback callback) {
         this.accessor = accessor;
         this.mergingComponents = mergingComponents;
         this.cursor = cursor;
-        this.mergeTarget = mergeTarget;
+        this.btreeMergeTarget = btreeMergeTarget;
+        this.bloomFilterMergeTarget = bloomFilterMergeTarget;
         this.callback = callback;
     }
 
@@ -59,7 +61,10 @@
 
     @Override
     public Set<IODeviceHandle> getWriteDevices() {
-        return Collections.singleton(mergeTarget.getDeviceHandle());
+        Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
+        devs.add(btreeMergeTarget.getDeviceHandle());
+        devs.add(bloomFilterMergeTarget.getDeviceHandle());
+        return devs;
     }
 
     @Override
@@ -72,8 +77,12 @@
         return callback;
     }
 
-    public FileReference getMergeTarget() {
-        return mergeTarget;
+    public FileReference getBTreeMergeTarget() {
+        return btreeMergeTarget;
+    }
+
+    public FileReference getBloomFilterMergeTarget() {
+        return bloomFilterMergeTarget;
     }
 
     public ITreeIndexCursor getCursor() {
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
index 3706230..154ecf6 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
@@ -19,6 +19,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
@@ -29,6 +30,7 @@
 import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.freepage.LinkedListFreePageManagerFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeCopyTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
@@ -38,7 +40,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -80,13 +81,20 @@
                 typeTraits.length);
         TreeIndexFactory<BTree> bulkLoadBTreeFactory = new BTreeFactory(diskBufferCache, diskFileMapProvider,
                 freePageManagerFactory, interiorFrameFactory, insertLeafFrameFactory, cmpFactories, typeTraits.length);
-        ILSMIndexFileManager fileNameManager = new LSMIndexFileManager(ioManager, diskFileMapProvider, file,
+
+        int keyFields[] = new int[cmpFactories.length];
+        for (int i = 0; i < cmpFactories.length; i++) {
+            keyFields[i] = i;
+        }
+        BloomFilterFactory bloomFilterFactory = new BloomFilterFactory(diskBufferCache, diskFileMapProvider, keyFields);
+
+        ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, diskFileMapProvider, file,
                 diskBTreeFactory, startIODeviceIndex);
 
         LSMBTree lsmTree = new LSMBTree(memBufferCache, memFreePageManager, interiorFrameFactory,
                 insertLeafFrameFactory, deleteLeafFrameFactory, fileNameManager, diskBTreeFactory,
-                bulkLoadBTreeFactory, diskFileMapProvider, typeTraits.length, cmpFactories, mergePolicy,
-                opTrackerFactory, ioScheduler, ioOpCallbackProvider);
+                bulkLoadBTreeFactory, bloomFilterFactory, diskFileMapProvider, typeTraits.length, cmpFactories,
+                mergePolicy, opTrackerFactory, ioScheduler, ioOpCallbackProvider);
         return lsmTree;
     }
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java
index d00b805..1f3a2b7 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java
@@ -1,11 +1,13 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 
 public interface ILSMComponentFactory {
-    public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException;
+    public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException,
+            HyracksDataException;
 
     public IBufferCache getBufferCache();
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 3d2ebcd..e2bd77c 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -48,7 +48,6 @@
     protected final IFileMapProvider fileMapProvider;
 
     // baseDir should reflect dataset name and partition name.
-    protected FileReference file;
     protected String baseDir;
     protected final Format formatter = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
     protected final Comparator<String> cmp = new FileNameComparator();
@@ -61,7 +60,6 @@
 
     public AbstractLSMIndexFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
             TreeIndexFactory<? extends ITreeIndex> treeFactory, int startIODeviceIndex) {
-        this.file = file;
         this.baseDir = file.getFile().getPath();
         if (!baseDir.endsWith(System.getProperty("file.separator"))) {
             baseDir += System.getProperty("file.separator");
@@ -100,9 +98,21 @@
         }
     }
 
-    abstract protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
+    protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
             TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
-            throws HyracksDataException, IndexException;
+            throws HyracksDataException, IndexException {
+        File dir = new File(dev.getPath(), baseDir);
+        String[] files = dir.list(filter);
+        for (String fileName : files) {
+            File file = new File(dir.getPath() + File.separator + fileName);
+            FileReference fileRef = new FileReference(file);
+            if (isValidTreeIndex(treeFactory.createIndexInstance(fileRef))) {
+                allFiles.add(new ComparableFileName(fileRef));
+            } else {
+                file.delete();
+            }
+        }
+    }
 
     @Override
     public void createDirs() {
@@ -145,7 +155,7 @@
         Date date = new Date();
         String ts = formatter.format(date);
         // Begin timestamp and end timestamp are identical since it is a flush
-        return new LSMComponentFileReferences(createFlushFile(baseDir + ts + SPLIT_STRING + ts), null);
+        return new LSMComponentFileReferences(createFlushFile(baseDir + ts + SPLIT_STRING + ts), null, null);
     }
 
     @Override
@@ -155,7 +165,7 @@
         String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
         // Get the range of timestamps by taking the earliest and the latest timestamps
         return new LSMComponentFileReferences(createMergeFile(baseDir + firstTimestampRange[0] + SPLIT_STRING
-                + lastTimestampRange[1]), null);
+                + lastTimestampRange[1]), null, null);
     }
 
     @Override
@@ -177,7 +187,7 @@
         }
 
         if (allFiles.size() == 1) {
-            validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null));
+            validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null, null));
             return validFiles;
         }
 
@@ -209,7 +219,7 @@
         // Sort valid files in reverse lexicographical order, such that newer files come first.
         Collections.sort(validComparableFiles, recencyCmp);
         for (ComparableFileName cmpFileName : validComparableFiles) {
-            validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null));
+            validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null, null));
         }
 
         return validFiles;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
index ac6ddcf..019dca4 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
@@ -24,9 +24,14 @@
     // This FileReference for the delete index (if any). For example, this will be the the FileReference of the buddy BTree in one component of the LSM-RTree.
     private final FileReference deleteIndexFileReference;
 
-    public LSMComponentFileReferences(FileReference insertIndexFileReference, FileReference deleteIndexFileReference) {
+    // This FileReference for the bloom filter (if any). 
+    private final FileReference bloomFilterFileReference;
+
+    public LSMComponentFileReferences(FileReference insertIndexFileReference, FileReference deleteIndexFileReference,
+            FileReference bloomFilterFileReference) {
         this.insertIndexFileReference = insertIndexFileReference;
         this.deleteIndexFileReference = deleteIndexFileReference;
+        this.bloomFilterFileReference = bloomFilterFileReference;
     }
 
     public FileReference getInsertIndexFileReference() {
@@ -36,4 +41,8 @@
     public FileReference getDeleteIndexFileReference() {
         return deleteIndexFileReference;
     }
+
+    public FileReference getBloomFilterFileReference() {
+        return bloomFilterFileReference;
+    }
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
deleted file mode 100644
index 6ce1f08..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.util.Collections;
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
-
-public class LSMFlushOperation implements ILSMIOOperation {
-
-    private final ILSMIndexAccessorInternal accessor;
-    private final ILSMComponent flushingComponent;
-    private final FileReference flushTarget;
-    private final ILSMIOOperationCallback callback;
-
-    public LSMFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
-            FileReference flushTarget, ILSMIOOperationCallback callback) {
-        this.accessor = accessor;
-        this.flushingComponent = flushingComponent;
-        this.flushTarget = flushTarget;
-        this.callback = callback;
-    }
-
-    @Override
-    public Set<IODeviceHandle> getReadDevices() {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<IODeviceHandle> getWriteDevices() {
-        return Collections.singleton(flushTarget.getDeviceHandle());
-    }
-
-    @Override
-    public void perform() throws HyracksDataException, IndexException {
-        accessor.flush(this);
-    }
-
-    @Override
-    public ILSMIOOperationCallback getCallback() {
-        return callback;
-    }
-
-    public FileReference getFlushTarget() {
-        return flushTarget;
-    }
-
-    public ILSMIndexAccessorInternal getAccessor() {
-        return accessor;
-    }
-
-    public ILSMComponent getFlushingComponent() {
-        return flushingComponent;
-    }
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexFileManager.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexFileManager.java
deleted file mode 100644
index a7a16ac..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexFileManager.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.ArrayList;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-
-public class LSMIndexFileManager extends AbstractLSMIndexFileManager {
-
-    public LSMIndexFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
-            TreeIndexFactory<? extends ITreeIndex> treeFactory, int startIODeviceIndex) {
-        super(ioManager, fileMapProvider, file, treeFactory, startIODeviceIndex);
-    }
-
-    protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
-            TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
-            throws HyracksDataException, IndexException {
-        File dir = new File(dev.getPath(), baseDir);
-        String[] files = dir.list(filter);
-        for (String fileName : files) {
-            File file = new File(dir.getPath() + File.separator + fileName);
-            FileReference fileRef = new FileReference(file);
-            if (isValidTreeIndex(treeFactory.createIndexInstance(fileRef))) {
-                allFiles.add(new ComparableFileName(fileRef));
-            } else {
-                file.delete();
-            }
-        }
-    }
-}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
index 6b07608..da3cad5 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
@@ -37,13 +37,14 @@
 
     private final int[] fieldPermutation;
     private final boolean verifyInput;
+    private final long numElementsHint;
 
     public LSMInvertedIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] fieldPermutation,
-            boolean verifyInput, IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
-            IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
-            IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
-            IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
-            IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory,
+            boolean verifyInput, long numElementsHint, IStorageManagerInterface storageManager,
+            IFileSplitProvider fileSplitProvider, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenComparatorFactories,
+            ITypeTraits[] invListsTypeTraits, IBinaryComparatorFactory[] invListComparatorFactories,
+            IBinaryTokenizerFactory tokenizerFactory, IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory,
             IModificationOperationCallbackFactory modificationOpCallbackFactory) {
         super(spec, 1, 0, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
@@ -51,12 +52,13 @@
                 NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
         this.fieldPermutation = fieldPermutation;
         this.verifyInput = verifyInput;
+        this.numElementsHint = numElementsHint;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new IndexBulkLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, 1.0f, verifyInput,
-                recordDescProvider);
+                numElementsHint, recordDescProvider);
     }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index da6bc28..159352f 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -414,7 +414,7 @@
         memBTreeAccessor.search(scanCursor, nullPred);
 
         // Bulk load the disk inverted index from the in-memory inverted index.
-        IIndexBulkLoader invIndexBulkLoader = diskInvertedIndex.createBulkLoader(1.0f, false);
+        IIndexBulkLoader invIndexBulkLoader = diskInvertedIndex.createBulkLoader(1.0f, false, 0L);
         try {
             while (scanCursor.hasNext()) {
                 scanCursor.next();
@@ -435,7 +435,7 @@
         deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred);
 
         // Bulk load the deleted-keys BTree.
-        IIndexBulkLoader deletedKeysBTreeBulkLoader = diskDeletedKeysBTree.createBulkLoader(1.0f, false);
+        IIndexBulkLoader deletedKeysBTreeBulkLoader = diskDeletedKeysBTree.createBulkLoader(1.0f, false, 0L);
         try {
             while (deletedKeysScanCursor.hasNext()) {
                 deletedKeysScanCursor.next();
@@ -490,7 +490,7 @@
 
         IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
         IIndexCursor cursor = mergeOp.getCursor();
-        IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true);
+        IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
@@ -518,15 +518,17 @@
     }
 
     @Override
-    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
-        return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput);
+    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+            throws IndexException {
+        return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint);
     }
 
     public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
         private final ILSMComponent component;
         private final IIndexBulkLoader invIndexBulkLoader;
 
-        public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+        public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+                throws IndexException {
             // Note that by using a flush target file name, we state that the
             // new bulk loaded tree is "newer" than any other merged tree.
             try {
@@ -537,7 +539,7 @@
                 throw new TreeIndexException(e);
             }
             invIndexBulkLoader = ((LSMInvertedIndexImmutableComponent) component).getInvIndex().createBulkLoader(
-                    fillFactor, verifyInput);
+                    fillFactor, verifyInput, numElementsHint);
         }
 
         @Override
@@ -580,7 +582,7 @@
             FileReference dictBTreeFileRef, FileReference btreeFileRef, boolean create) throws HyracksDataException,
             IndexException {
         LSMInvertedIndexImmutableComponent component = (LSMInvertedIndexImmutableComponent) factory
-                .createLSMComponentInstance(new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef));
+                .createLSMComponentInstance(new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef, null));
         if (create) {
             component.getInvIndex().create();
             component.getDeletedKeysBTree().create();
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 8ffb0bd..21a11dc 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -29,14 +29,14 @@
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 // TODO: Refactor for better code sharing with other file managers.
-public class LSMInvertedIndexFileManager extends LSMIndexFileManager implements IInvertedIndexFileNameMapper {
+public class LSMInvertedIndexFileManager extends AbstractLSMIndexFileManager implements IInvertedIndexFileNameMapper {
     private static final String DICT_BTREE_SUFFIX = "b";
     private static final String INVLISTS_SUFFIX = "i";
     private static final String DELETED_KEYS_BTREE_SUFFIX = "d";
@@ -69,7 +69,7 @@
         String baseName = baseDir + ts + SPLIT_STRING + ts;
         // Begin timestamp and end timestamp are identical since it is a flush
         return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + DICT_BTREE_SUFFIX),
-                createFlushFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX));
+                createFlushFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX), null);
     }
 
     @Override
@@ -81,7 +81,7 @@
         String baseName = baseDir + firstTimestampRange[0] + SPLIT_STRING + lastTimestampRange[1];
         // Get the range of timestamps by taking the earliest and the latest timestamps
         return new LSMComponentFileReferences(createMergeFile(baseName + SPLIT_STRING + DICT_BTREE_SUFFIX),
-                createMergeFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX));
+                createMergeFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX), null);
     }
 
     @Override
@@ -132,7 +132,7 @@
 
         if (allDictBTreeFiles.size() == 1 && allDeletedKeysBTreeFiles.size() == 1) {
             validFiles.add(new LSMComponentFileReferences(allDictBTreeFiles.get(0).fileRef, allDeletedKeysBTreeFiles
-                    .get(0).fileRef));
+                    .get(0).fileRef, null));
             return validFiles;
         }
 
@@ -183,7 +183,8 @@
         while (dictBTreeFileIter.hasNext() && deletedKeysBTreeIter.hasNext()) {
             ComparableFileName cmpDictBTreeFile = dictBTreeFileIter.next();
             ComparableFileName cmpDeletedKeysBTreeFile = deletedKeysBTreeIter.next();
-            validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.fileRef, cmpDeletedKeysBTreeFile.fileRef));
+            validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.fileRef, cmpDeletedKeysBTreeFile.fileRef,
+                    null));
         }
 
         return validFiles;
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index 668250c..d5a074e 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -191,7 +191,8 @@
     }
 
     @Override
-    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+            throws IndexException {
         throw new UnsupportedOperationException("Bulk load not supported by in-memory inverted index.");
     }
 
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index f1552eb..afeaf90 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -76,7 +76,7 @@
     protected final int invListEndPageIdField;
     protected final int invListStartOffField;
     protected final int invListNumElementsField;
-    
+
     // Type traits to be appended to the token type trait which finally form the BTree field type traits.
     protected static final ITypeTraits[] btreeValueTypeTraits = new ITypeTraits[4];
     static {
@@ -283,7 +283,7 @@
                 btreeTuple.getFieldStart(invListNumElementsField));
         listCursor.reset(startPageId, endPageId, startOff, numElements);
     }
-    
+
     public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
         private final ArrayTupleBuilder btreeTupleBuilder;
         private final ArrayTupleReference btreeTupleReference;
@@ -302,8 +302,8 @@
         private final boolean verifyInput;
         private final MultiComparator allCmp;
 
-        public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean verifyInput, int startPageId, int fileId)
-                throws IndexException, HyracksDataException {
+        public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean verifyInput, long numElementsHint,
+                int startPageId, int fileId) throws IndexException, HyracksDataException {
             this.verifyInput = verifyInput;
             this.tokenCmp = MultiComparator.create(btree.getComparatorFactories());
             this.invListCmp = MultiComparator.create(invListCmpFactories);
@@ -316,7 +316,7 @@
             this.btreeTupleReference = new ArrayTupleReference();
             this.lastTupleBuilder = new ArrayTupleBuilder(numTokenFields + numInvListKeys);
             this.lastTuple = new ArrayTupleReference();
-            this.btreeBulkloader = btree.createBulkLoader(btreeFillFactor, verifyInput);
+            this.btreeBulkloader = btree.createBulkLoader(btreeFillFactor, verifyInput, numElementsHint);
             currentPageId = startPageId;
             currentPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), true);
             currentPage.acquireWriteLatch();
@@ -477,7 +477,7 @@
             this.index = index;
             this.searcher = searcher;
         }
-        
+
         @Override
         public IIndexCursor createSearchCursor() {
             return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
@@ -563,9 +563,10 @@
     }
 
     @Override
-    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+            throws IndexException {
         try {
-            return new OnDiskInvertedIndexBulkLoader(fillFactor, verifyInput, rootPageId, fileId);
+            return new OnDiskInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint, rootPageId, fileId);
         } catch (HyracksDataException e) {
             throw new InvertedIndexException(e);
         }
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 5a1482f..0e85cfd 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -229,7 +229,7 @@
             FileReference deleteFileRef, boolean createComponent) throws HyracksDataException, IndexException {
         // Create new tree instance.
         LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) factory
-                .createLSMComponentInstance(new LSMComponentFileReferences(insertFileRef, deleteFileRef));
+                .createLSMComponentInstance(new LSMComponentFileReferences(insertFileRef, deleteFileRef, null));
         if (createComponent) {
             component.getRTree().create();
             if (component.getBTree() != null) {
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index e43e525..ea6548e 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -207,7 +207,7 @@
     @Override
     public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
         LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation;
-        LSMRTreeMutableComponent flushingComponent = flushOp.getFlushingComponent();
+        LSMRTreeMutableComponent flushingComponent = (LSMRTreeMutableComponent) flushOp.getFlushingComponent();
         // Renaming order is critical because we use assume ordering when we
         // read the file names when we open the tree.
         // The RTree should be renamed before the BTree.
@@ -251,7 +251,7 @@
         if (!isEmpty) {
             rTreeTupleSorter.sort();
         }
-        rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false);
+        rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L);
         cursor = rTreeTupleSorter;
 
         try {
@@ -274,7 +274,7 @@
         BTree diskBTree = component.getBTree();
 
         // BulkLoad the tuples from the in-memory tree into the new disk BTree.
-        IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false);
+        IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false, 0L);
         try {
             while (btreeScanCursor.hasNext()) {
                 btreeScanCursor.next();
@@ -330,7 +330,7 @@
         RTree mergedRTree = component.getRTree();
         BTree mergedBTree = component.getBTree();
 
-        IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false);
+        IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
@@ -343,7 +343,7 @@
         bulkloader.end();
 
         // Load an empty BTree tree.
-        mergedBTree.createBulkLoader(1.0f, false).end();
+        mergedBTree.createBulkLoader(1.0f, false, 0L).end();
 
         return new LSMRTreeImmutableComponent(mergedRTree, mergedBTree);
     }
@@ -377,15 +377,17 @@
     }
 
     @Override
-    public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput) throws TreeIndexException {
-        return new LSMRTreeBulkLoader(fillLevel, verifyInput);
+    public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+            throws TreeIndexException {
+        return new LSMRTreeBulkLoader(fillLevel, verifyInput, numElementsHint);
     }
 
     public class LSMRTreeBulkLoader implements IIndexBulkLoader {
         private final ILSMComponent component;
         private final IIndexBulkLoader bulkLoader;
 
-        public LSMRTreeBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+        public LSMRTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+                throws TreeIndexException {
             // Note that by using a flush target file name, we state that the
             // new bulk loaded tree is "newer" than any other merged tree.
             try {
@@ -395,7 +397,8 @@
             } catch (IndexException e) {
                 throw new TreeIndexException(e);
             }
-            bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput);
+            bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
+                    numElementsHint);
         }
 
         @Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index 7ceecad..041fda1 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -30,12 +30,12 @@
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
-public class LSMRTreeFileManager extends LSMIndexFileManager {
+public class LSMRTreeFileManager extends AbstractLSMIndexFileManager {
     private static final String RTREE_STRING = "r";
     private static final String BTREE_STRING = "b";
 
@@ -69,7 +69,7 @@
         String baseName = baseDir + ts + SPLIT_STRING + ts;
         // Begin timestamp and end timestamp are identical since it is a flush
         return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + RTREE_STRING),
-                createFlushFile(baseName + SPLIT_STRING + BTREE_STRING));
+                createFlushFile(baseName + SPLIT_STRING + BTREE_STRING), null);
     }
 
     @Override
@@ -81,7 +81,7 @@
         String baseName = baseDir + firstTimestampRange[0] + SPLIT_STRING + lastTimestampRange[1];
         // Get the range of timestamps by taking the earliest and the latest timestamps
         return new LSMComponentFileReferences(createMergeFile(baseName + SPLIT_STRING + RTREE_STRING),
-                createMergeFile(baseName + SPLIT_STRING + BTREE_STRING));
+                createMergeFile(baseName + SPLIT_STRING + BTREE_STRING), null);
     }
 
     @Override
@@ -127,7 +127,8 @@
         }
 
         if (allRTreeFiles.size() == 1 && allBTreeFiles.size() == 1) {
-            validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).fileRef, allBTreeFiles.get(0).fileRef));
+            validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).fileRef, allBTreeFiles.get(0).fileRef,
+                    null));
             return validFiles;
         }
 
@@ -178,7 +179,7 @@
         while (rtreeFileIter.hasNext() && btreeFileIter.hasNext()) {
             ComparableFileName cmpRTreeFileName = rtreeFileIter.next();
             ComparableFileName cmpBTreeFileName = btreeFileIter.next();
-            validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.fileRef, cmpBTreeFileName.fileRef));
+            validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.fileRef, cmpBTreeFileName.fileRef, null));
         }
 
         return validFiles;
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index 8698a1d..8c5c6fe 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -8,6 +8,7 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
@@ -15,12 +16,12 @@
 public class LSMRTreeFlushOperation implements ILSMIOOperation {
 
     private final ILSMIndexAccessorInternal accessor;
-    private final LSMRTreeMutableComponent flushingComponent;
+    private final ILSMComponent flushingComponent;
     private final FileReference rtreeFlushTarget;
     private final FileReference btreeFlushTarget;
     private final ILSMIOOperationCallback callback;
 
-    public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, LSMRTreeMutableComponent flushingComponent,
+    public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
             FileReference rtreeFlushTarget, FileReference btreeFlushTarget, ILSMIOOperationCallback callback) {
         this.accessor = accessor;
         this.flushingComponent = flushingComponent;
@@ -38,7 +39,9 @@
     public Set<IODeviceHandle> getWriteDevices() {
         Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
         devs.add(rtreeFlushTarget.getDeviceHandle());
-        devs.add(btreeFlushTarget.getDeviceHandle());
+        if (btreeFlushTarget != null) {
+            devs.add(btreeFlushTarget.getDeviceHandle());
+        }
         return devs;
     }
 
@@ -60,7 +63,7 @@
         return btreeFlushTarget;
     }
 
-    public LSMRTreeMutableComponent getFlushingComponent() {
+    public ILSMComponent getFlushingComponent() {
         return flushingComponent;
     }
 }
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index 970b253..d6a8693 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -1,6 +1,5 @@
 package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -40,14 +39,21 @@
         for (ILSMComponent o : mergingComponents) {
             LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) o;
             devs.add(component.getRTree().getFileReference().getDeviceHandle());
-            devs.add(component.getBTree().getFileReference().getDeviceHandle());
+            if (component.getBTree() != null) {
+                devs.add(component.getBTree().getFileReference().getDeviceHandle());
+            }
         }
         return devs;
     }
 
     @Override
     public Set<IODeviceHandle> getWriteDevices() {
-        return Collections.singleton(rtreeMergeTarget.getDeviceHandle());
+        Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
+        devs.add(rtreeMergeTarget.getDeviceHandle());
+        if (btreeMergeTarget != null) {
+            devs.add(btreeMergeTarget.getDeviceHandle());
+        }
+        return devs;
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 824b8ef..ea3fc46 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -52,7 +52,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
@@ -205,13 +204,13 @@
         opCtx.setOperation(IndexOperation.FLUSH);
         opCtx.getComponentHolder().add(flushingComponent);
         ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, opCtx);
-        ioScheduler.scheduleOperation(new LSMFlushOperation(accessor, flushingComponent, relFlushFileRefs
-                .getInsertIndexFileReference(), callback));
+        ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, relFlushFileRefs
+                .getInsertIndexFileReference(), null, callback));
     }
 
     @Override
     public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
-        LSMFlushOperation flushOp = (LSMFlushOperation) operation;
+        LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation;
         // Renaming order is critical because we use assume ordering when we
         // read the file names when we open the tree.
         // The RTree should be renamed before the BTree.
@@ -221,8 +220,8 @@
         RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor();
         SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
         memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
-        LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getFlushTarget(), null,
-                true);
+        LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getRTreeFlushTarget(),
+                null, true);
         RTree diskRTree = component.getRTree();
 
         // scan the memory BTree
@@ -281,7 +280,7 @@
             bTreeTupleSorter.sort();
         }
 
-        IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false);
+        IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L);
         LSMRTreeFlushCursor cursor = new LSMRTreeFlushCursor(rTreeTupleSorter, bTreeTupleSorter, comparatorFields,
                 linearizerArray);
         cursor.open(null, null);
@@ -337,7 +336,7 @@
         LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
                 null, true);
         RTree mergedRTree = component.getRTree();
-        IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false);
+        IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
@@ -374,8 +373,9 @@
     }
 
     @Override
-    public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput) throws TreeIndexException {
-        return new LSMRTreeWithAntiMatterTuplesBulkLoader(fillLevel, verifyInput);
+    public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+            throws TreeIndexException {
+        return new LSMRTreeWithAntiMatterTuplesBulkLoader(fillLevel, verifyInput, numElementsHint);
     }
 
     private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
@@ -387,7 +387,8 @@
         private final ILSMComponent component;
         private final IIndexBulkLoader bulkLoader;
 
-        public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+        public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+                throws TreeIndexException {
             // Note that by using a flush target file name, we state that the
             // new bulk loaded tree is "newer" than any other merged tree.
             try {
@@ -397,7 +398,8 @@
             } catch (IndexException e) {
                 throw new TreeIndexException(e);
             }
-            bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput);
+            bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
+                    numElementsHint);
         }
 
         @Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
new file mode 100644
index 0000000..10b982f
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class LSMRTreeWithAntiMatterTuplesFileManager extends AbstractLSMIndexFileManager {
+
+    private final TreeIndexFactory<? extends ITreeIndex> rtreeFactory;
+
+    public LSMRTreeWithAntiMatterTuplesFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider,
+            FileReference file, TreeIndexFactory<? extends ITreeIndex> rtreeFactory, int startIODeviceIndex) {
+        super(ioManager, fileMapProvider, file, null, startIODeviceIndex);
+        this.rtreeFactory = rtreeFactory;
+    }
+
+    @Override
+    public LSMComponentFileReferences getRelFlushFileReference() {
+        Date date = new Date();
+        String ts = formatter.format(date);
+        // Begin timestamp and end timestamp are identical since it is a flush
+        return new LSMComponentFileReferences(createFlushFile(baseDir + ts + SPLIT_STRING + ts), null, null);
+    }
+
+    @Override
+    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
+            throws HyracksDataException {
+        String[] firstTimestampRange = firstFileName.split(SPLIT_STRING);
+        String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
+        // Get the range of timestamps by taking the earliest and the latest timestamps
+        return new LSMComponentFileReferences(createMergeFile(baseDir + firstTimestampRange[0] + SPLIT_STRING
+                + lastTimestampRange[1]), null, null);
+    }
+
+    private static FilenameFilter fileNameFilter = new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+            return !name.startsWith(".");
+        }
+    };
+
+    @Override
+    public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException, IndexException {
+        List<LSMComponentFileReferences> validFiles = new ArrayList<LSMComponentFileReferences>();
+        ArrayList<ComparableFileName> allFiles = new ArrayList<ComparableFileName>();
+
+        // Gather files from all IODeviceHandles and delete invalid files
+        // There are two types of invalid files:
+        // (1) The isValid flag is not set
+        // (2) The file's interval is contained by some other file
+        // Here, we only filter out (1).
+        for (IODeviceHandle dev : ioManager.getIODevices()) {
+            cleanupAndGetValidFilesInternal(dev, fileNameFilter, rtreeFactory, allFiles);
+        }
+
+        if (allFiles.isEmpty()) {
+            return validFiles;
+        }
+
+        if (allFiles.size() == 1) {
+            validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null, null));
+            return validFiles;
+        }
+
+        // Sorts files names from earliest to latest timestamp.
+        Collections.sort(allFiles);
+
+        List<ComparableFileName> validComparableFiles = new ArrayList<ComparableFileName>();
+        ComparableFileName last = allFiles.get(0);
+        validComparableFiles.add(last);
+        for (int i = 1; i < allFiles.size(); i++) {
+            ComparableFileName current = allFiles.get(i);
+            // The current start timestamp is greater than last stop timestamp so current is valid.
+            if (current.interval[0].compareTo(last.interval[1]) > 0) {
+                validComparableFiles.add(current);
+                last = current;
+            } else if (current.interval[0].compareTo(last.interval[0]) >= 0
+                    && current.interval[1].compareTo(last.interval[1]) <= 0) {
+                // The current file is completely contained in the interval of the 
+                // last file. Thus the last file must contain at least as much information 
+                // as the current file, so delete the current file.
+                current.fileRef.delete();
+            } else {
+                // This scenario should not be possible since timestamps are monotonically increasing.
+                throw new HyracksDataException("Found LSM files with overlapping timestamp intervals, "
+                        + "but the intervals were not contained by another file.");
+            }
+        }
+
+        // Sort valid files in reverse lexicographical order, such that newer files come first.
+        Collections.sort(validComparableFiles, recencyCmp);
+        for (ComparableFileName cmpFileName : validComparableFiles) {
+            validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null, null));
+        }
+
+        return validFiles;
+    }
+}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
index e4d2e82..b3a4ab0 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
@@ -39,11 +39,11 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuples;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuplesFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.RTreeFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.tuples.LSMRTreeCopyTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.tuples.LSMRTreeTupleWriterFactory;
@@ -172,8 +172,8 @@
         IBinaryComparatorFactory[] linearizerArray = { linearizerCmpFactory,
                 btreeCmpFactories[btreeCmpFactories.length - 1] };
 
-        ILSMIndexFileManager fileNameManager = new LSMIndexFileManager(ioManager, diskFileMapProvider, file,
-                diskRTreeFactory, startIODeviceIndex);
+        ILSMIndexFileManager fileNameManager = new LSMRTreeWithAntiMatterTuplesFileManager(ioManager,
+                diskFileMapProvider, file, diskRTreeFactory, startIODeviceIndex);
         LSMRTreeWithAntiMatterTuples lsmTree = new LSMRTreeWithAntiMatterTuples(memBufferCache, memFreePageManager,
                 rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory,
                 fileNameManager, diskRTreeFactory, bulkLoadRTreeFactory, diskFileMapProvider, typeTraits.length,
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index 773d593..c12dc50 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -848,7 +848,8 @@
     }
 
     @Override
-    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+            throws TreeIndexException {
         // TODO: verifyInput currently does nothing.
         try {
             return new RTreeBulkLoader(fillFactor);
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
index 770a2ad..81f7fce 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
@@ -591,7 +591,7 @@
             LOGGER.info("Bulk loading " + ins + " tuples");
         }
         long start = System.currentTimeMillis();
-        IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false);
+        IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false, ins);
         ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
         ArrayTupleReference tuple = new ArrayTupleReference();
         for (int i = 0; i < ins; i++) {
@@ -656,7 +656,7 @@
             treeIndex.activate();
 
             // Load sorted records, and expect to fail at tuple i.
-            IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, true);
+            IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, true, ins);
             for (int j = 0; j < ins; j++) {
                 if (j > i) {
                     fail("Bulk load failure test unexpectedly succeeded past tuple: " + j);
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
index 818373c..1a80231 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
@@ -106,7 +106,7 @@
     }
 
     public void checkDiskOrderScan(IIndexTestContext ctx) throws Exception {
-    	try {
+        try {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Testing Disk-Order Scan.");
             }
@@ -243,7 +243,7 @@
         ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
         ArrayTupleReference tuple = new ArrayTupleReference();
         // Perform bulk load.
-        IIndexBulkLoader bulkLoader = ctx.getIndex().createBulkLoader(0.7f, false);
+        IIndexBulkLoader bulkLoader = ctx.getIndex().createBulkLoader(0.7f, false, numTuples);
         int c = 1;
         for (CheckTuple checkTuple : checkTuples) {
             if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
index 3998108..f962200 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
@@ -91,6 +91,15 @@
     public static final int LSM_INVINDEX_SCAN_COUNT_ARRAY_SIZE = 1000000;
     public static final int LSM_INVINDEX_MULTITHREAD_NUM_OPERATIONS = 200;
 
+    // Test params for BloomFilter
+    public static final int BLOOM_FILTER_NUM_TUPLES_TO_INSERT = 100;
+
+    // Mem configuration for BloomFilter.
+    public static final int BLOOM_FILTER_PAGE_SIZE = 256;
+    public static final int BLOOM_FILTER_NUM_PAGES = 1000;
+    public static final int BLOOM_FILTER_MAX_OPEN_FILES = 10;
+    public static final int BLOOM_FILTER_HYRACKS_FRAME_SIZE = 128;
+
 }
 
 /* ORIGINAL TEST PARAMETERS: DO NOT EDIT!
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
index e911a98..f93e9b6 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
@@ -688,7 +688,7 @@
             LOGGER.info("Bulk loading " + numInserts + " tuples");
         }
         long start = System.currentTimeMillis();
-        IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false);
+        IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false, numInserts);
         ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
         ArrayTupleReference tuple = new ArrayTupleReference();
 
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/pom.xml b/hyracks-tests/hyracks-storage-am-bloomfilter-test/pom.xml
new file mode 100644
index 0000000..3b15677
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/pom.xml
@@ -0,0 +1,49 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks</groupId>
+  <artifactId>hyracks-storage-am-bloomfilter-test</artifactId>
+  <version>0.2.2-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-tests</artifactId>
+    <version>0.2.2-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>junit</groupId>
+  		<artifactId>junit</artifactId>
+  		<version>4.8.1</version>
+  		<type>jar</type>
+  		<scope>test</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-storage-am-bloomfilter</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-test-support</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>test</scope>
+  	</dependency>
+  </dependencies>
+</project>
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
new file mode 100644
index 0000000..8856da1
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.logging.Level;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.util.AbstractBloomFilterTest;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+
+@SuppressWarnings("rawtypes")
+public class BloomFilterTest extends AbstractBloomFilterTest {
+    private final Random rnd = new Random(50);
+
+    @Before
+    public void setUp() throws HyracksDataException {
+        super.setUp();
+    }
+
+    @Test
+    public void singleFieldTest() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("TESTING BLOOM FILTER");
+        }
+
+        IBufferCache bufferCache = harness.getBufferCache();
+
+        int numElements = 100;
+        int[] keyFields = { 0 };
+        int numHashes = 5;
+
+        BloomFilter bf = new BloomFilter(bufferCache, harness.getFileMapProvider(), harness.getFileReference(),
+                keyFields);
+
+        bf.create();
+        bf.activate();
+        IIndexBulkLoader builder = bf.createBuilder(numElements, numHashes);
+
+        int fieldCount = 2;
+        ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+        ArrayTupleReference tuple = new ArrayTupleReference();
+
+        // generate keys
+        int maxKey = 1000;
+        TreeSet<Integer> uniqueKeys = new TreeSet<Integer>();
+        ArrayList<Integer> keys = new ArrayList<Integer>();
+        while (uniqueKeys.size() < numElements) {
+            int key = rnd.nextInt() % maxKey;
+            uniqueKeys.add(key);
+        }
+        for (Integer i : uniqueKeys) {
+            keys.add(i);
+        }
+
+        // Insert tuples in the bloom filter
+        for (int i = 0; i < keys.size(); ++i) {
+            TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
+            builder.add(tuple);
+        }
+        builder.end();
+
+        // Check all the inserted tuples can be found.
+
+        long[] hashes = new long[2];
+        for (int i = 0; i < keys.size(); ++i) {
+            TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
+            Assert.assertTrue(bf.contains(tuple, hashes));
+        }
+
+        bf.deactivate();
+        bf.destroy();
+    }
+
+    @Test
+    public void multiFieldTest() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("TESTING BLOOM FILTER");
+        }
+
+        IBufferCache bufferCache = harness.getBufferCache();
+
+        int numElements = 10000;
+        int[] keyFields = { 2, 4, 1 };
+        int numHashes = 10;
+
+        BloomFilter bf = new BloomFilter(bufferCache, harness.getFileMapProvider(), harness.getFileReference(),
+                keyFields);
+
+        bf.create();
+        bf.activate();
+        IIndexBulkLoader builder = bf.createBuilder(numElements, numHashes);
+
+        int fieldCount = 5;
+        ISerializerDeserializer[] fieldSerdes = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+        ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+        ArrayTupleReference tuple = new ArrayTupleReference();
+
+        int maxLength = 20;
+        ArrayList<String> s1 = new ArrayList<String>();
+        ArrayList<String> s2 = new ArrayList<String>();
+        ArrayList<String> s3 = new ArrayList<String>();
+        ArrayList<String> s4 = new ArrayList<String>();
+        for (int i = 0; i < numElements; ++i) {
+            s1.add(randomString(rnd.nextInt() % maxLength, rnd));
+            s2.add(randomString(rnd.nextInt() % maxLength, rnd));
+            s3.add(randomString(rnd.nextInt() % maxLength, rnd));
+            s4.add(randomString(rnd.nextInt() % maxLength, rnd));
+        }
+
+        for (int i = 0; i < numElements; ++i) {
+            TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1.get(i), s2.get(i), i, s3.get(i), s4.get(i));
+            builder.add(tuple);
+        }
+        builder.end();
+
+        long[] hashes = new long[2];
+        for (int i = 0; i < numElements; ++i) {
+            TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1.get(i), s2.get(i), i, s3.get(i), s4.get(i));
+            Assert.assertTrue(bf.contains(tuple, hashes));
+        }
+
+        bf.deactivate();
+        bf.destroy();
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java
new file mode 100644
index 0000000..284a6cb
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java
@@ -0,0 +1,296 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.logging.Level;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.util.AbstractBloomFilterTest;
+
+@SuppressWarnings("rawtypes")
+public class MurmurHashForITupleReferenceTest extends AbstractBloomFilterTest {
+    private final static int NUM_LONG_VARS_FOR_128_BIT_HASH = 2;
+    private final static int DUMMY_FIELD = 0;
+    private final Random rnd = new Random(50);
+
+    @Before
+    public void setUp() throws HyracksDataException {
+        super.setUp();
+    }
+
+    @Test
+    public void murmurhashONEIntegerFieldTest() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("TESTING MURMUR HASH ONE INTEGER FIELD");
+        }
+
+        int fieldCount = 2;
+        ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+        ArrayTupleReference tuple = new ArrayTupleReference();
+        TupleUtils.createIntegerTuple(tupleBuilder, tuple, rnd.nextInt());
+        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+
+        int keyFields[] = { 0 };
+        int length = getTupleSize(tuple, keyFields);
+
+        long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+        MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+        ByteBuffer buffer;
+        byte[] array = new byte[length];
+        fillArrayWithData(array, keyFields, tuple, length);
+        buffer = ByteBuffer.wrap(array);
+
+        long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+        Assert.assertArrayEquals(expecteds, actuals);
+    }
+
+    @Test
+    public void murmurhashTwoIntegerFieldsTest() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("TESTING MURMUR HASH TWO INTEGER FIELDS");
+        }
+
+        int fieldCount = 2;
+        ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+        ArrayTupleReference tuple = new ArrayTupleReference();
+        TupleUtils.createIntegerTuple(tupleBuilder, tuple, rnd.nextInt(), rnd.nextInt());
+        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+
+        int keyFields[] = { 0, 1 };
+        int length = getTupleSize(tuple, keyFields);
+
+        long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+        MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+        ByteBuffer buffer;
+        byte[] array = new byte[length];
+        fillArrayWithData(array, keyFields, tuple, length);
+        buffer = ByteBuffer.wrap(array);
+
+        long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+        Assert.assertArrayEquals(expecteds, actuals);
+    }
+
+    @Test
+    public void murmurhashOneStringFieldTest() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("TESTING MURMUR HASH ONE STRING FIELD");
+        }
+
+        int fieldCount = 2;
+        ISerializerDeserializer[] fieldSerdes = { UTF8StringSerializerDeserializer.INSTANCE };
+        ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+        ArrayTupleReference tuple = new ArrayTupleReference();
+        String s = randomString(100, rnd);
+        TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s);
+
+        int keyFields[] = { 0 };
+        int length = getTupleSize(tuple, keyFields);
+
+        long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+        MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+        byte[] array = new byte[length];
+        ByteBuffer buffer;
+        fillArrayWithData(array, keyFields, tuple, length);
+        buffer = ByteBuffer.wrap(array);
+
+        long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+        Assert.assertArrayEquals(expecteds, actuals);
+    }
+
+    @Test
+    public void murmurhashThreeStringFieldsTest() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("TESTING MURMUR HASH THREE STRING FIELDS");
+        }
+
+        int fieldCount = 3;
+        ISerializerDeserializer[] fieldSerdes = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+        ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+        ArrayTupleReference tuple = new ArrayTupleReference();
+        String s1 = randomString(40, rnd);
+        String s2 = randomString(60, rnd);
+        String s3 = randomString(20, rnd);
+        TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1, s2, s3);
+
+        int keyFields[] = { 2, 0, 1 };
+        int length = getTupleSize(tuple, keyFields);
+
+        long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+        MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+        byte[] array = new byte[length];
+        ByteBuffer buffer;
+        fillArrayWithData(array, keyFields, tuple, length);
+        buffer = ByteBuffer.wrap(array);
+
+        long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+        Assert.assertArrayEquals(expecteds, actuals);
+    }
+
+    private void fillArrayWithData(byte[] array, int[] keyFields, ITupleReference tuple, int length) {
+        int currentFieldIndex = 0;
+        int bytePos = 0;
+        for (int i = 0; i < length; ++i) {
+            array[i] = tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex]) + bytePos];
+            ++bytePos;
+            if (tuple.getFieldLength(keyFields[currentFieldIndex]) == bytePos) {
+                ++currentFieldIndex;
+                bytePos = 0;
+            }
+        }
+    }
+
+    private int getTupleSize(ITupleReference tuple, int[] keyFields) {
+        int length = 0;
+        for (int i = 0; i < keyFields.length; ++i) {
+            length += tuple.getFieldLength(keyFields[i]);
+        }
+        return length;
+    }
+
+    /**
+     * The hash3_x64_128 and getblock functions are borrowed from cassandra source code for testing purpose
+     **/
+    protected static long getblock(ByteBuffer key, int offset, int index) {
+        int i_8 = index << 3;
+        int blockOffset = offset + i_8;
+        return ((long) key.get(blockOffset + 0) & 0xff) + (((long) key.get(blockOffset + 1) & 0xff) << 8)
+                + (((long) key.get(blockOffset + 2) & 0xff) << 16) + (((long) key.get(blockOffset + 3) & 0xff) << 24)
+                + (((long) key.get(blockOffset + 4) & 0xff) << 32) + (((long) key.get(blockOffset + 5) & 0xff) << 40)
+                + (((long) key.get(blockOffset + 6) & 0xff) << 48) + (((long) key.get(blockOffset + 7) & 0xff) << 56);
+    }
+
+    public static long[] hash3_x64_128(ByteBuffer key, int offset, int length, long seed) {
+        final int nblocks = length >> 4; // Process as 128-bit blocks.
+
+        long h1 = seed;
+        long h2 = seed;
+
+        long c1 = 0x87c37b91114253d5L;
+        long c2 = 0x4cf5ad432745937fL;
+
+        //----------
+        // body
+
+        for (int i = 0; i < nblocks; i++) {
+            long k1 = getblock(key, offset, i * 2 + 0);
+            long k2 = getblock(key, offset, i * 2 + 1);
+
+            k1 *= c1;
+            k1 = MurmurHash128Bit.rotl64(k1, 31);
+            k1 *= c2;
+            h1 ^= k1;
+
+            h1 = MurmurHash128Bit.rotl64(h1, 27);
+            h1 += h2;
+            h1 = h1 * 5 + 0x52dce729;
+
+            k2 *= c2;
+            k2 = MurmurHash128Bit.rotl64(k2, 33);
+            k2 *= c1;
+            h2 ^= k2;
+
+            h2 = MurmurHash128Bit.rotl64(h2, 31);
+            h2 += h1;
+            h2 = h2 * 5 + 0x38495ab5;
+        }
+
+        //----------
+        // tail
+
+        // Advance offset to the unprocessed tail of the data.
+        offset += nblocks * 16;
+
+        long k1 = 0;
+        long k2 = 0;
+
+        switch (length & 15) {
+            case 15:
+                k2 ^= ((long) key.get(offset + 14)) << 48;
+            case 14:
+                k2 ^= ((long) key.get(offset + 13)) << 40;
+            case 13:
+                k2 ^= ((long) key.get(offset + 12)) << 32;
+            case 12:
+                k2 ^= ((long) key.get(offset + 11)) << 24;
+            case 11:
+                k2 ^= ((long) key.get(offset + 10)) << 16;
+            case 10:
+                k2 ^= ((long) key.get(offset + 9)) << 8;
+            case 9:
+                k2 ^= ((long) key.get(offset + 8)) << 0;
+                k2 *= c2;
+                k2 = MurmurHash128Bit.rotl64(k2, 33);
+                k2 *= c1;
+                h2 ^= k2;
+
+            case 8:
+                k1 ^= ((long) key.get(offset + 7)) << 56;
+            case 7:
+                k1 ^= ((long) key.get(offset + 6)) << 48;
+            case 6:
+                k1 ^= ((long) key.get(offset + 5)) << 40;
+            case 5:
+                k1 ^= ((long) key.get(offset + 4)) << 32;
+            case 4:
+                k1 ^= ((long) key.get(offset + 3)) << 24;
+            case 3:
+                k1 ^= ((long) key.get(offset + 2)) << 16;
+            case 2:
+                k1 ^= ((long) key.get(offset + 1)) << 8;
+            case 1:
+                k1 ^= ((long) key.get(offset));
+                k1 *= c1;
+                k1 = MurmurHash128Bit.rotl64(k1, 31);
+                k1 *= c2;
+                h1 ^= k1;
+        };
+
+        //----------
+        // finalization
+
+        h1 ^= length;
+        h2 ^= length;
+
+        h1 += h2;
+        h2 += h1;
+
+        h1 = MurmurHash128Bit.fmix(h1);
+        h2 = MurmurHash128Bit.fmix(h2);
+
+        h1 += h2;
+        h2 += h1;
+
+        return (new long[] { h1, h2 });
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java
new file mode 100644
index 0000000..9b857a6
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.util;
+
+import java.util.Random;
+import java.util.logging.Logger;
+
+import org.junit.After;
+import org.junit.Before;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractBloomFilterTest {
+    protected final Logger LOGGER = Logger.getLogger(BloomFilterTestHarness.class.getName());
+
+    protected final BloomFilterTestHarness harness;
+
+    public AbstractBloomFilterTest() {
+        harness = new BloomFilterTestHarness();
+    }
+
+    public AbstractBloomFilterTest(int pageSize, int numPages, int maxOpenFiles, int hyracksFrameSize) {
+        harness = new BloomFilterTestHarness(pageSize, numPages, maxOpenFiles, hyracksFrameSize);
+    }
+
+    @Before
+    public void setUp() throws HyracksDataException {
+        harness.setUp();
+    }
+
+    @After
+    public void tearDown() throws HyracksDataException {
+        harness.tearDown();
+    }
+
+    public static String randomString(int length, Random random) {
+        char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+        StringBuilder strBuilder = new StringBuilder();
+        for (int i = 0; i < length; ++i) {
+            char c = chars[random.nextInt(chars.length)];
+            strBuilder.append(c);
+        }
+        return strBuilder.toString();
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.java
new file mode 100644
index 0000000..8fac122
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.util;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
+import edu.uci.ics.hyracks.test.support.TestUtils;
+
+public class BloomFilterTestHarness {
+
+    private static final long RANDOM_SEED = 50;
+
+    protected final int pageSize;
+    protected final int numPages;
+    protected final int maxOpenFiles;
+    protected final int hyracksFrameSize;
+
+    protected IHyracksTaskContext ctx;
+    protected IBufferCache bufferCache;
+    protected IFileMapProvider fileMapProvider;
+    protected FileReference file;
+
+    protected final Random rnd = new Random();
+    protected final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
+    protected final String tmpDir = System.getProperty("java.io.tmpdir");
+    protected final String sep = System.getProperty("file.separator");
+    protected String fileName;
+
+    public BloomFilterTestHarness() {
+        this.pageSize = AccessMethodTestsConfig.BLOOM_FILTER_PAGE_SIZE;
+        this.numPages = AccessMethodTestsConfig.BLOOM_FILTER_NUM_PAGES;
+        this.maxOpenFiles = AccessMethodTestsConfig.BLOOM_FILTER_MAX_OPEN_FILES;
+        this.hyracksFrameSize = AccessMethodTestsConfig.BLOOM_FILTER_HYRACKS_FRAME_SIZE;
+    }
+
+    public BloomFilterTestHarness(int pageSize, int numPages, int maxOpenFiles, int hyracksFrameSize) {
+        this.pageSize = pageSize;
+        this.numPages = numPages;
+        this.maxOpenFiles = maxOpenFiles;
+        this.hyracksFrameSize = hyracksFrameSize;
+    }
+
+    public void setUp() throws HyracksDataException {
+        fileName = tmpDir + sep + simpleDateFormat.format(new Date());
+        ctx = TestUtils.create(getHyracksFrameSize());
+        TestStorageManagerComponentHolder.init(pageSize, numPages, maxOpenFiles);
+        bufferCache = TestStorageManagerComponentHolder.getBufferCache(ctx);
+        fileMapProvider = TestStorageManagerComponentHolder.getFileMapProvider(ctx);
+        file = new FileReference(new File(fileName));
+        rnd.setSeed(RANDOM_SEED);
+    }
+
+    public void tearDown() throws HyracksDataException {
+        bufferCache.close();
+        file.delete();
+    }
+
+    public IHyracksTaskContext getHyracksTaskContext() {
+        return ctx;
+    }
+
+    public IBufferCache getBufferCache() {
+        return bufferCache;
+    }
+
+    public IFileMapProvider getFileMapProvider() {
+        return fileMapProvider;
+    }
+
+    public FileReference getFileReference() {
+        return file;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public Random getRandom() {
+        return rnd;
+    }
+
+    public int getPageSize() {
+        return pageSize;
+    }
+
+    public int getNumPages() {
+        return numPages;
+    }
+
+    public int getHyracksFrameSize() {
+        return hyracksFrameSize;
+    }
+
+    public int getMaxOpenFiles() {
+        return maxOpenFiles;
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index d0d2c77..64f58c3 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -228,7 +228,7 @@
                 throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
             }
 
-            IIndexBulkLoader bulkloader = index.createBulkLoader(1.0f, false);
+            IIndexBulkLoader bulkloader = index.createBulkLoader(1.0f, false, end - begin);
             for (int i = begin; i <= end; i++) {
                 TupleUtils.createIntegerTuple(builder, tuple, i);
                 bulkloader.add(tuple);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java
index 2e9395f..69e2b58 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java
@@ -37,7 +37,7 @@
     public long runExperiment(DataGenThread dataGen, int numThreads) throws Exception {
         btree.create();
         long start = System.currentTimeMillis();
-        IIndexBulkLoader bulkLoader = btree.createBulkLoader(1.0f, false);
+        IIndexBulkLoader bulkLoader = btree.createBulkLoader(1.0f, false, 0L);
         for (int i = 0; i < numBatches; i++) {
             TupleBatch batch = dataGen.tupleBatchQueue.take();
             for (int j = 0; j < batch.size(); j++) {
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index 7b54884..97f78f3 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -205,7 +205,7 @@
         ISerializerDeserializer[] fieldSerdes = testCtx.getFieldSerdes();
 
         // Use the expected index to bulk-load the actual index.
-        IIndexBulkLoader bulkLoader = testCtx.getIndex().createBulkLoader(1.0f, false);
+        IIndexBulkLoader bulkLoader = testCtx.getIndex().createBulkLoader(1.0f, false, numDocs);
         ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(testCtx.getFieldSerdes().length);
         ArrayTupleReference tuple = new ArrayTupleReference();
         Iterator<CheckTuple> checkTupleIter = tmpMemIndex.iterator();
diff --git a/hyracks-tests/pom.xml b/hyracks-tests/pom.xml
index b79295a..4011339 100644
--- a/hyracks-tests/pom.xml
+++ b/hyracks-tests/pom.xml
@@ -19,5 +19,6 @@
     <module>hyracks-storage-am-lsm-btree-test</module>
     <module>hyracks-storage-am-lsm-rtree-test</module>
     <module>hyracks-storage-am-lsm-invertedindex-test</module>
+    <module>hyracks-storage-am-bloomfilter-test</module>
   </modules>
 </project>
diff --git a/pom.xml b/pom.xml
index 49a5887..c642992 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,6 +95,7 @@
     <module>hyracks-cli</module>
     <module>hyracks-storage-common</module>
     <module>hyracks-storage-am-common</module>
+    <module>hyracks-storage-am-bloomfilter</module>
     <module>hyracks-storage-am-btree</module>
     <module>hyracks-storage-am-lsm-invertedindex</module>
     <module>hyracks-storage-am-lsm-common</module>