[ASTERIXDB-2186][STO] Cache-friendly Bloom Filter

- user model changes: no
- storage format changes: yes. Change the format of bloom filter
- interface changes: no

Details:
- Introduce blocked bloom filter to guarantee only 1 random memory
access is required during each membership check. It improves bloom
filter performance by 2x - 4x, depending on the ratio of positive
queries.
- For legacy bloom filters, we fall back to previous implementation
based on the stored version in the metadata.
- Add pinAllPages/unpinAllPages method to reduce pin/unpin overhead.

Change-Id: I0e8e0db9b60d5addfaf61ebb372a1bcb2d2d5957
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2201
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index eaf9bbf..b6d4f6b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -139,6 +139,7 @@
     public static final int VBC_ALREADY_CLOSED = 103;
     public static final int INDEX_DOES_NOT_EXIST = 104;
     public static final int CANNOT_DROP_IN_USE_INDEX = 105;
+    public static final int CANNOT_DEACTIVATE_PINNED_BLOOM_FILTER = 106;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 1f82a17..6254b86 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -122,5 +122,6 @@
 103 = Failed to close virtual buffer cache since it is already closed
 104 = Index does not exist
 105 = Cannot drop in-use index (%1$s)
+106 = Failed to deactivate the bloom filter since it is pinned by other users
 
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomCalculations.java b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomCalculations.java
index 5b387f4..e0cf1a80 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomCalculations.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomCalculations.java
@@ -43,12 +43,9 @@
      * Each cell (i,j) the false positive rate determined by using i buckets per
      * element and j hash functions.
      */
-    static final double[][] probs = new double[][] {
-            { 1.0 }, // dummy row representing 0 buckets per element
+    static final double[][] probs = new double[][] { { 1.0 }, // dummy row representing 0 buckets per element
             { 1.0, 1.0 }, // dummy row representing 1 buckets per element
-            { 1.0, 0.393, 0.400 },
-            { 1.0, 0.283, 0.237, 0.253 },
-            { 1.0, 0.221, 0.155, 0.147, 0.160 },
+            { 1.0, 0.393, 0.400 }, { 1.0, 0.283, 0.237, 0.253 }, { 1.0, 0.221, 0.155, 0.147, 0.160 },
             { 1.0, 0.181, 0.109, 0.092, 0.092, 0.101 }, // 5
             { 1.0, 0.154, 0.0804, 0.0609, 0.0561, 0.0578, 0.0638 },
             { 1.0, 0.133, 0.0618, 0.0423, 0.0359, 0.0347, 0.0364 },
@@ -63,8 +60,8 @@
             { 1.0, 0.0606, 0.0138, 0.005, 0.00239, 0.00139, 0.000935, 0.000702, 0.000574, 0.000505, 0.00047, 0.000459 },
             { 1.0, 0.0571, 0.0123, 0.00423, 0.00193, 0.00107, 0.000692, 0.000499, 0.000394, 0.000335, 0.000302,
                     0.000287, 0.000284 },
-            { 1.0, 0.054, 0.0111, 0.00362, 0.00158, 0.000839, 0.000519, 0.00036, 0.000275, 0.000226, 0.000198,
-                    0.000183, 0.000176 },
+            { 1.0, 0.054, 0.0111, 0.00362, 0.00158, 0.000839, 0.000519, 0.00036, 0.000275, 0.000226, 0.000198, 0.000183,
+                    0.000176 },
             { 1.0, 0.0513, 0.00998, 0.00312, 0.0013, 0.000663, 0.000394, 0.000264, 0.000194, 0.000155, 0.000132,
                     0.000118, 0.000111, 0.000109 },
             { 1.0, 0.0488, 0.00906, 0.0027, 0.00108, 0.00053, 0.000303, 0.000196, 0.00014, 0.000108, 8.89e-05,
@@ -148,8 +145,9 @@
             K--;
         }
 
-        return new BloomFilterSpecification(K, bucketsPerElement);
-    }
+        // we allocate one more bucket per element to compensate the effect introduced by using blocked bloom filter
+        // a detail analysis can be found at https://dl.acm.org/citation.cfm?id=1594230
+        return new BloomFilterSpecification(K, bucketsPerElement + 1);    }
 
     /**
      * Calculates the maximum number of buckets per element that this implementation
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index 68b96a3..3d8782a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -38,6 +38,14 @@
     private static final int NUM_HASHES_USED_OFFSET = NUM_PAGES_OFFSET + 4; // 4
     private static final int NUM_ELEMENTS_OFFSET = NUM_HASHES_USED_OFFSET + 4; // 8
     private static final int NUM_BITS_OFFSET = NUM_ELEMENTS_OFFSET + 8; // 12
+    private static final int VERSION_OFFSET = NUM_BITS_OFFSET + 8; // 20
+
+    // we use cache line size as the block size (64 bytes)
+    private static final int NUM_BITS_PER_BLOCK = 64 * 8;
+
+    private static final int DEFAULT_BLOOM_FILTER_VERSION = 0;
+
+    private static final int BLOCKED_BLOOM_FILTER_VERSION = 1;
 
     private final IBufferCache bufferCache;
     private final FileReference file;
@@ -49,7 +57,13 @@
     private int numHashes;
     private long numElements;
     private long numBits;
+    // keep trace of the version of the bloomfilter to be backward compatible
+    private int version;
     private final int numBitsPerPage;
+    private final int numBlocksPerPage;
+    private ICachedPage[] pages;
+    private int pinCount = 0;
+    private boolean pagesPinned = false;
     private static final byte[] ZERO_BUFFER = new byte[131072]; // 128kb
     private static final long SEED = 0L;
 
@@ -58,6 +72,7 @@
         this.file = file;
         this.keyFields = keyFields;
         this.numBitsPerPage = bufferCache.getPageSize() * Byte.SIZE;
+        this.numBlocksPerPage = this.numBitsPerPage / NUM_BITS_PER_BLOCK;
     }
 
     public int getFileId() {
@@ -68,6 +83,30 @@
         return file;
     }
 
+    public synchronized void pinAllPages() throws HyracksDataException {
+        if (pinCount == 0) {
+            // first time pin
+            if (pages == null) {
+                pages = new ICachedPage[numPages];
+            }
+            for (int i = 0; i < numPages; i++) {
+                pages[i] = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, i + 1), false);
+            }
+            pagesPinned = true;
+        }
+        pinCount++;
+    }
+
+    public synchronized void unpinAllPages() throws HyracksDataException {
+        if (pinCount == 1) {
+            for (int i = 0; i < numPages; i++) {
+                bufferCache.unpin(pages[i]);
+            }
+            pagesPinned = false;
+        }
+        pinCount--;
+    }
+
     public int getNumPages() throws HyracksDataException {
         if (!isActivated) {
             activate();
@@ -87,6 +126,51 @@
             return false;
         }
         MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
+        if (version == BLOCKED_BLOOM_FILTER_VERSION) {
+            return blockContains(hashes);
+        } else {
+            return legacyContains(hashes);
+        }
+    }
+
+    private boolean blockContains(long[] hashes) throws HyracksDataException {
+        // take first hash to compute block id
+        long hash = Math.abs(hashes[0] % numBits);
+        long blockId = hash / NUM_BITS_PER_BLOCK;
+        int pageId = (int) (blockId / numBlocksPerPage);
+        long groupStartIndex = (blockId % numBlocksPerPage) * NUM_BITS_PER_BLOCK;
+
+        boolean unpinWhenExit = false;
+        ICachedPage page = null;
+        if (pagesPinned) {
+            page = pages[pageId];
+        } else {
+            page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId + 1), false);
+            unpinWhenExit = true;
+        }
+        ByteBuffer buffer = page.getBuffer();
+
+        try {
+            for (int i = 1; i < numHashes; ++i) {
+                hash = Math.abs((hashes[0] + i * hashes[1]) % NUM_BITS_PER_BLOCK);
+                int byteIndex = (int) ((hash + groupStartIndex) >> 3); // divide 8
+                byte b = buffer.get(byteIndex);
+                int bitIndex = (int) (hash & 0x07); // mod 8
+                if (!((b & (1L << bitIndex)) != 0)) {
+                    return false;
+                }
+            }
+        } finally {
+            if (unpinWhenExit) {
+                bufferCache.unpin(page);
+            }
+        }
+        return true;
+
+    }
+
+    // membership check for legacy bloom filters
+    private boolean legacyContains(long[] hashes) throws HyracksDataException {
         for (int i = 0; i < numHashes; ++i) {
             long hash = Math.abs((hashes[0] + i * hashes[1]) % numBits);
 
@@ -138,6 +222,7 @@
             numHashes = 0;
             numElements = 0;
             numBits = 0;
+            version = DEFAULT_BLOOM_FILTER_VERSION;
             return;
         }
         ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
@@ -147,6 +232,7 @@
             numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
             numElements = metaPage.getBuffer().getLong(NUM_ELEMENTS_OFFSET);
             numBits = metaPage.getBuffer().getLong(NUM_BITS_OFFSET);
+            version = metaPage.getBuffer().getInt(VERSION_OFFSET);
         } finally {
             metaPage.releaseReadLatch();
             bufferCache.unpin(metaPage);
@@ -157,6 +243,9 @@
         if (!isActivated) {
             throw HyracksDataException.create(ErrorCode.CANNOT_DEACTIVATE_INACTIVE_BLOOM_FILTER);
         }
+        if (pagesPinned) {
+            throw HyracksDataException.create(ErrorCode.CANNOT_DEACTIVATE_PINNED_BLOOM_FILTER);
+        }
         bufferCache.closeFile(fileId);
         isActivated = false;
     }
@@ -240,6 +329,7 @@
             metaDataPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
             metaDataPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, numElements);
             metaDataPage.getBuffer().putLong(NUM_BITS_OFFSET, numBits);
+            metaDataPage.getBuffer().putInt(VERSION_OFFSET, BLOCKED_BLOOM_FILTER_VERSION);
         }
 
         @Override
@@ -248,15 +338,21 @@
                 throw HyracksDataException.create(ErrorCode.CANNOT_ADD_TUPLES_TO_DUMMY_BLOOM_FILTER);
             }
             MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
-            for (int i = 0; i < numHashes; ++i) {
-                long hash = Math.abs((hashes[0] + i * hashes[1]) % numBits);
-                ICachedPage page = pages[(int) (hash / numBitsPerPage)];
-                ByteBuffer buffer = page.getBuffer();
-                int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
-                byte b = buffer.get(byteIndex);
-                int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
-                b = (byte) (b | (1 << bitIndex));
 
+            long hash = Math.abs(hashes[0] % numBits);
+            long groupId = hash / NUM_BITS_PER_BLOCK;
+            int pageId = (int) (groupId / numBlocksPerPage);
+            long groupStartIndex = (groupId % numBlocksPerPage) * NUM_BITS_PER_BLOCK;
+
+            ICachedPage page = pages[pageId];
+            ByteBuffer buffer = page.getBuffer();
+
+            for (int i = 1; i < numHashes; ++i) {
+                hash = Math.abs((hashes[0] + i * hashes[1]) % NUM_BITS_PER_BLOCK);
+                int byteIndex = (int) ((hash + groupStartIndex) >> 3); // divide 8
+                byte b = buffer.get(byteIndex);
+                int bitIndex = (int) (hash & 0x07); // mod 8
+                b = (byte) (b | (1 << bitIndex));
                 buffer.put(byteIndex, b);
             }
         }
@@ -273,6 +369,7 @@
             BloomFilter.this.numHashes = numHashes;
             BloomFilter.this.numElements = numElements;
             BloomFilter.this.numPages = numPages;
+            BloomFilter.this.version = BLOCKED_BLOOM_FILTER_VERSION;
         }
 
         @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/org/apache/hyracks/storage/am/bloomfilter/BloomFilterTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/org/apache/hyracks/storage/am/bloomfilter/BloomFilterTest.java
index c6652c3..f720d6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/org/apache/hyracks/storage/am/bloomfilter/BloomFilterTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/org/apache/hyracks/storage/am/bloomfilter/BloomFilterTest.java
@@ -97,6 +97,7 @@
         }
         builder.end();
 
+        bf.pinAllPages();
         // Check all the inserted tuples can be found.
 
         long[] hashes = BloomFilter.createHashArray();
@@ -104,7 +105,7 @@
             TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
             Assert.assertTrue(bf.contains(tuple, hashes));
         }
-
+        bf.unpinAllPages();
         bf.deactivate();
         bf.destroy();
     }
@@ -157,12 +158,14 @@
         }
         builder.end();
 
+        bf.pinAllPages();
         long[] hashes = BloomFilter.createHashArray();
         for (int i = 0; i < numElements; ++i) {
             TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1.get(i), s2.get(i), i, s3.get(i), s4.get(i));
             Assert.assertTrue(bf.contains(tuple, hashes));
         }
 
+        bf.unpinAllPages();
         bf.deactivate();
         bf.destroy();
     }