Fixed couple of bugs, added minor optimizations, and added more tests.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree_bloom_filter@2704 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/api/IFilter.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/api/IFilter.java
index 04c9f0b..06ead24 100644
--- a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/api/IFilter.java
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/api/IFilter.java
@@ -20,12 +20,12 @@
 
 public interface IFilter {
 
+    public void add(ITupleReference tuple, long[] hashes);
+
+    public boolean contains(ITupleReference tuple, long[] hashes);
+
     public void create() throws HyracksDataException;
 
-    public void add(ITupleReference tuple);
-
-    public boolean contains(ITupleReference tuple);
-
     public void destroy() throws HyracksDataException;
 
     public void activate() throws HyracksDataException;
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index 217269f..8e0c71d 100644
--- a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -32,32 +32,48 @@
     private final static int METADATA_PAGE_ID = 0;
     private final static int NUM_PAGES_OFFSET = 0; // 0
     private final static int NUM_HASHES_USED_OFFSET = NUM_PAGES_OFFSET + 8; // 8
+    private final static int NUM_BITS = NUM_HASHES_USED_OFFSET + 4; // 12
 
     private final static int NUM_BITS_PER_ELEMENT = 10;
 
     private final IBufferCache bufferCache;
     private final IFileMapProvider fileMapProvider;
     private final FileReference file;
-    private final long numElements;
     private final int[] keyFields;
-    private final int numHashes;
+    private int numHashes;
     private int fileId = -1;
     private boolean isActivated = false;
-    private final long numPages;
+    private long numPages;
+
+    private long numBits;
+    private int numBitsPerPage;
+
     private final ArrayList<ICachedPage> bloomFilterPages = new ArrayList<ICachedPage>();
     private final static long SEED = 0L;
-    private final int numBitsPerPage;
 
-    public BloomFilter(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file,
-            long numElements, int[] keyFields, int numHashes) {
+    private final boolean isNewFilter;
+
+    public BloomFilter(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file, int[] keyFields,
+            long numElements, int numHashes) throws HyracksDataException {
         this.bufferCache = bufferCache;
         this.fileMapProvider = fileMapProvider;
         this.file = file;
-        this.numElements = numElements;
         this.keyFields = keyFields;
         this.numHashes = numHashes;
         numBitsPerPage = bufferCache.getPageSize() * Byte.SIZE;
-        numPages = (long) Math.ceil((numElements * NUM_BITS_PER_ELEMENT) / (double) numBitsPerPage);
+        numBits = numElements * NUM_BITS_PER_ELEMENT;
+        numPages = (long) Math.ceil(numBits / (double) numBitsPerPage);
+        isNewFilter = true;
+    }
+
+    public BloomFilter(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file, int[] keyFields)
+            throws HyracksDataException {
+        this.bufferCache = bufferCache;
+        this.fileMapProvider = fileMapProvider;
+        this.file = file;
+        this.keyFields = keyFields;
+        numBitsPerPage = bufferCache.getPageSize() * Byte.SIZE;
+        isNewFilter = false;
     }
 
     public int getFileId() {
@@ -69,16 +85,15 @@
     }
 
     @Override
-    public void add(ITupleReference tuple) {
-        long[] hashes = new long[2];
+    public void add(ITupleReference tuple, long[] hashes) {
         MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
         for (int i = 0; i < numHashes; ++i) {
-            long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numElements);
+            long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
 
             ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
-            int byteIndex = (int) (hash % numBitsPerPage) / Byte.SIZE;
+            int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
             byte b = buffer.get(byteIndex);
-            int bitIndex = (int) (hash % numBitsPerPage) % Byte.SIZE;
+            int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
             b = (byte) (b | (1 << bitIndex));
 
             buffer.put(byteIndex, b);
@@ -86,16 +101,15 @@
     }
 
     @Override
-    public boolean contains(ITupleReference tuple) {
-        long[] hashes = new long[2];
+    public boolean contains(ITupleReference tuple, long[] hashes) {
         MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
         for (int i = 0; i < numHashes; ++i) {
-            long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numElements);
+            long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
 
             ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
-            int byteIndex = (int) (hash % numBitsPerPage) / Byte.SIZE;
+            int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
             byte b = buffer.get(byteIndex);
-            int bitIndex = (int) (hash % numBitsPerPage) % Byte.SIZE;
+            int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
 
             if (!((b & (1L << bitIndex)) != 0)) {
                 return false;
@@ -104,64 +118,69 @@
         return true;
     }
 
+    private void prepareFile() throws HyracksDataException {
+        boolean fileIsMapped = false;
+        synchronized (fileMapProvider) {
+            fileIsMapped = fileMapProvider.isMapped(file);
+            if (!fileIsMapped) {
+                bufferCache.createFile(file);
+            }
+            fileId = fileMapProvider.lookupFileId(file);
+            try {
+                // Also creates the file if it doesn't exist yet.
+                bufferCache.openFile(fileId);
+            } catch (HyracksDataException e) {
+                // Revert state of buffer cache since file failed to open.
+                if (!fileIsMapped) {
+                    bufferCache.deleteFile(fileId, false);
+                }
+                throw e;
+            }
+        }
+    }
+
+    private void persistBloomFilterMetaData() throws HyracksDataException {
+        ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
+        metaPage.acquireWriteLatch();
+        metaPage.getBuffer().putLong(NUM_PAGES_OFFSET, numPages);
+        metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
+        metaPage.getBuffer().putLong(NUM_BITS, numBits);
+        metaPage.releaseWriteLatch();
+        bufferCache.unpin(metaPage);
+    }
+
+    private void readBloomFilterMetaData() throws HyracksDataException {
+        ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
+        metaPage.acquireReadLatch();
+        numPages = metaPage.getBuffer().getLong(NUM_PAGES_OFFSET);
+        numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
+        numBits = metaPage.getBuffer().getLong(NUM_BITS);
+        metaPage.releaseReadLatch();
+        bufferCache.unpin(metaPage);
+    }
+
     @Override
     public synchronized void create() throws HyracksDataException {
         if (isActivated) {
             throw new HyracksDataException("Failed to create the bloom filter since it is activated.");
+        } else if (!isNewFilter) {
+            throw new HyracksDataException(
+                    "Cannot create a bloom filter with this bloom filter instance. Please use a different bloom filter constructor.");
         }
 
-        boolean fileIsMapped = false;
-        synchronized (fileMapProvider) {
-            fileIsMapped = fileMapProvider.isMapped(file);
-            if (!fileIsMapped) {
-                bufferCache.createFile(file);
-            }
-            fileId = fileMapProvider.lookupFileId(file);
-            try {
-                // Also creates the file if it doesn't exist yet.
-                bufferCache.openFile(fileId);
-            } catch (HyracksDataException e) {
-                // Revert state of buffer cache since file failed to open.
-                if (!fileIsMapped) {
-                    bufferCache.deleteFile(fileId, false);
-                }
-                throw e;
-            }
-        }
-        ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
-        page.acquireWriteLatch();
-        page.getBuffer().putLong(NUM_PAGES_OFFSET, numPages);
-        page.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
-        page.releaseWriteLatch();
-        bufferCache.unpin(page);
+        prepareFile();
+        persistBloomFilterMetaData();
         bufferCache.closeFile(fileId);
     }
 
     @Override
-    public void activate() throws HyracksDataException {
+    public synchronized void activate() throws HyracksDataException {
         if (isActivated) {
             return;
         }
 
-        boolean fileIsMapped = false;
-        synchronized (fileMapProvider) {
-            fileIsMapped = fileMapProvider.isMapped(file);
-            if (!fileIsMapped) {
-                bufferCache.createFile(file);
-            }
-            fileId = fileMapProvider.lookupFileId(file);
-            try {
-                // Also creates the file if it doesn't exist yet.
-                bufferCache.openFile(fileId);
-            } catch (HyracksDataException e) {
-                // Revert state of buffer cache since file failed to open.
-                if (!fileIsMapped) {
-                    bufferCache.deleteFile(fileId, false);
-                }
-                throw e;
-            }
-        }
-        isActivated = true;
+        prepareFile();
+        readBloomFilterMetaData();
 
         int currentPageId = 1;
         while (currentPageId <= numPages) {
@@ -170,10 +189,11 @@
             bloomFilterPages.add(page);
             ++currentPageId;
         }
+        isActivated = true;
     }
 
     @Override
-    public void deactivate() throws HyracksDataException {
+    public synchronized void deactivate() throws HyracksDataException {
         if (!isActivated) {
             return;
         }
@@ -183,12 +203,13 @@
             page.releaseWriteLatch();
             bufferCache.unpin(page);
         }
+        bloomFilterPages.clear();
         bufferCache.closeFile(fileId);
         isActivated = false;
     }
 
     @Override
-    public void destroy() throws HyracksDataException {
+    public synchronized void destroy() throws HyracksDataException {
         if (isActivated) {
             throw new HyracksDataException("Failed to destroy the bloom filter since it is activated.");
         }
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
index b9f5bf2..ea5ae4b 100644
--- a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
@@ -24,16 +24,19 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
 import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import edu.uci.ics.hyracks.storage.am.bloomfilter.util.AbstractBloomFilterTest;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 
+@SuppressWarnings("rawtypes")
 public class BloomFilterTest extends AbstractBloomFilterTest {
-    private final int fieldCount = 2;
     private final Random rnd = new Random(50);
 
     @Before
@@ -51,23 +54,23 @@
 
         long numElements = 100L;
         int[] keyFields = { 0 };
-        int numHashes = 10;
+        int numHashes = 5;
 
         BloomFilter bf = new BloomFilter(bufferCache, harness.getFileMapProvider(), harness.getFileReference(),
-                numElements, keyFields, numHashes);
+                keyFields, numElements, numHashes);
 
         bf.create();
         bf.activate();
 
+        int fieldCount = 2;
         ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
         ArrayTupleReference tuple = new ArrayTupleReference();
 
         // generate keys
-        int numKeys = 50;
         int maxKey = 1000;
         TreeSet<Integer> uniqueKeys = new TreeSet<Integer>();
         ArrayList<Integer> keys = new ArrayList<Integer>();
-        while (uniqueKeys.size() < numKeys) {
+        while (uniqueKeys.size() < numElements) {
             int key = rnd.nextInt() % maxKey;
             uniqueKeys.add(key);
         }
@@ -75,14 +78,71 @@
             keys.add(i);
         }
 
+        long[] hashes = new long[2];
+        // Check against an empty bloom filter
         for (int i = 0; i < keys.size(); ++i) {
-
             TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
-            tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+            Assert.assertFalse(bf.contains(tuple, hashes));
+        }
 
-            bf.add(tuple);
+        // Check all the inserted tuples can be found
+        for (int i = 0; i < keys.size(); ++i) {
+            TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
+            bf.add(tuple, hashes);
+            Assert.assertTrue(bf.contains(tuple, hashes));
+        }
 
-            Assert.assertTrue(bf.contains(tuple));
+        // Deactivate the bllom filter
+        bf.deactivate();
+
+        // Activate the bloom filter and check the tuples again
+        bf.activate();
+        for (int i = 0; i < keys.size(); ++i) {
+            TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
+            bf.add(tuple, hashes);
+            Assert.assertTrue(bf.contains(tuple, hashes));
+        }
+
+        bf.deactivate();
+        bf.destroy();
+    }
+
+    @Test
+    public void multiFieldTest() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("TESTING BLOOM FILTER");
+        }
+
+        IBufferCache bufferCache = harness.getBufferCache();
+
+        long numElements = 10000L;
+        int[] keyFields = { 2, 4, 1 };
+        int numHashes = 10;
+
+        BloomFilter bf = new BloomFilter(bufferCache, harness.getFileMapProvider(), harness.getFileReference(),
+                keyFields, numElements, numHashes);
+
+        bf.create();
+        bf.activate();
+
+        int fieldCount = 5;
+        ISerializerDeserializer[] fieldSerdes = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+        ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+        ArrayTupleReference tuple = new ArrayTupleReference();
+
+        long[] hashes = new long[2];
+        int maxLength = 20;
+        for (int i = 0; i < numElements; ++i) {
+            String s1 = randomString(rnd.nextInt() % maxLength, rnd);
+            String s2 = randomString(rnd.nextInt() % maxLength, rnd);
+            String s3 = randomString(rnd.nextInt() % maxLength, rnd);
+            String s4 = randomString(rnd.nextInt() % maxLength, rnd);
+            TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1, s2, rnd.nextInt(), s3, s4);
+
+            bf.add(tuple, hashes);
+            Assert.assertTrue(bf.contains(tuple, hashes));
         }
 
         bf.deactivate();
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java
index 4c694a0..8fd1147 100644
--- a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java
@@ -178,15 +178,7 @@
         return length;
     }
 
-    public static String randomString(int length, Random random) {
-        char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
-        StringBuilder strBuilder = new StringBuilder();
-        for (int i = 0; i < length; ++i) {
-            char c = chars[random.nextInt(chars.length)];
-            strBuilder.append(c);
-        }
-        return strBuilder.toString();
-    }
+    
 
     protected static long getblock(ByteBuffer key, int offset, int index) {
         int i_8 = index << 3;
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java
index 9712da9..9b857a6 100644
--- a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java
@@ -15,6 +15,7 @@
 
 package edu.uci.ics.hyracks.storage.am.bloomfilter.util;
 
+import java.util.Random;
 import java.util.logging.Logger;
 
 import org.junit.After;
@@ -44,4 +45,14 @@
     public void tearDown() throws HyracksDataException {
         harness.tearDown();
     }
+
+    public static String randomString(int length, Random random) {
+        char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+        StringBuilder strBuilder = new StringBuilder();
+        for (int i = 0; i < length; ++i) {
+            char c = chars[random.nextInt(chars.length)];
+            strBuilder.append(c);
+        }
+        return strBuilder.toString();
+    }
 }
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.class b/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.class
new file mode 100644
index 0000000..2a9be9d
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.class
Binary files differ
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.class b/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.class
new file mode 100644
index 0000000..aa61a5e
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.class
Binary files differ
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.class b/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.class
new file mode 100644
index 0000000..2f406c0
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.class
Binary files differ
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.class b/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.class
new file mode 100644
index 0000000..c204b8d
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/target/test-classes/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.class
Binary files differ