[ASTERIXDB-2186][STO] Cache-friendly Bloom Filter
- user model changes: no
- storage format changes: yes. Change the format of bloom filter
- interface changes: no
Details:
- Introduce blocked bloom filter to guarantee only 1 random memory
access is required during each membership check. It improves bloom
filter performance by 2x - 4x, depending on the ratio of positive
queries.
- For legacy bloom filters, we fall back to previous implementation
based on the stored version in the metadata.
- Add pinAllPages/unpinAllPages method to reduce pin/unpin overhead.
Change-Id: I0e8e0db9b60d5addfaf61ebb372a1bcb2d2d5957
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2201
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index eaf9bbf..b6d4f6b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -139,6 +139,7 @@
public static final int VBC_ALREADY_CLOSED = 103;
public static final int INDEX_DOES_NOT_EXIST = 104;
public static final int CANNOT_DROP_IN_USE_INDEX = 105;
+ public static final int CANNOT_DEACTIVATE_PINNED_BLOOM_FILTER = 106;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 1f82a17..6254b86 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -122,5 +122,6 @@
103 = Failed to close virtual buffer cache since it is already closed
104 = Index does not exist
105 = Cannot drop in-use index (%1$s)
+106 = Failed to deactivate the bloom filter since it is pinned by other users
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomCalculations.java b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomCalculations.java
index 5b387f4..e0cf1a80 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomCalculations.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomCalculations.java
@@ -43,12 +43,9 @@
* Each cell (i,j) the false positive rate determined by using i buckets per
* element and j hash functions.
*/
- static final double[][] probs = new double[][] {
- { 1.0 }, // dummy row representing 0 buckets per element
+ static final double[][] probs = new double[][] { { 1.0 }, // dummy row representing 0 buckets per element
{ 1.0, 1.0 }, // dummy row representing 1 buckets per element
- { 1.0, 0.393, 0.400 },
- { 1.0, 0.283, 0.237, 0.253 },
- { 1.0, 0.221, 0.155, 0.147, 0.160 },
+ { 1.0, 0.393, 0.400 }, { 1.0, 0.283, 0.237, 0.253 }, { 1.0, 0.221, 0.155, 0.147, 0.160 },
{ 1.0, 0.181, 0.109, 0.092, 0.092, 0.101 }, // 5
{ 1.0, 0.154, 0.0804, 0.0609, 0.0561, 0.0578, 0.0638 },
{ 1.0, 0.133, 0.0618, 0.0423, 0.0359, 0.0347, 0.0364 },
@@ -63,8 +60,8 @@
{ 1.0, 0.0606, 0.0138, 0.005, 0.00239, 0.00139, 0.000935, 0.000702, 0.000574, 0.000505, 0.00047, 0.000459 },
{ 1.0, 0.0571, 0.0123, 0.00423, 0.00193, 0.00107, 0.000692, 0.000499, 0.000394, 0.000335, 0.000302,
0.000287, 0.000284 },
- { 1.0, 0.054, 0.0111, 0.00362, 0.00158, 0.000839, 0.000519, 0.00036, 0.000275, 0.000226, 0.000198,
- 0.000183, 0.000176 },
+ { 1.0, 0.054, 0.0111, 0.00362, 0.00158, 0.000839, 0.000519, 0.00036, 0.000275, 0.000226, 0.000198, 0.000183,
+ 0.000176 },
{ 1.0, 0.0513, 0.00998, 0.00312, 0.0013, 0.000663, 0.000394, 0.000264, 0.000194, 0.000155, 0.000132,
0.000118, 0.000111, 0.000109 },
{ 1.0, 0.0488, 0.00906, 0.0027, 0.00108, 0.00053, 0.000303, 0.000196, 0.00014, 0.000108, 8.89e-05,
@@ -148,8 +145,9 @@
K--;
}
- return new BloomFilterSpecification(K, bucketsPerElement);
- }
+ // we allocate one more bucket per element to compensate the effect introduced by using blocked bloom filter
+ // a detail analysis can be found at https://dl.acm.org/citation.cfm?id=1594230
+ return new BloomFilterSpecification(K, bucketsPerElement + 1); }
/**
* Calculates the maximum number of buckets per element that this implementation
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index 68b96a3..3d8782a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -38,6 +38,14 @@
private static final int NUM_HASHES_USED_OFFSET = NUM_PAGES_OFFSET + 4; // 4
private static final int NUM_ELEMENTS_OFFSET = NUM_HASHES_USED_OFFSET + 4; // 8
private static final int NUM_BITS_OFFSET = NUM_ELEMENTS_OFFSET + 8; // 12
+ private static final int VERSION_OFFSET = NUM_BITS_OFFSET + 8; // 20
+
+ // we use cache line size as the block size (64 bytes)
+ private static final int NUM_BITS_PER_BLOCK = 64 * 8;
+
+ private static final int DEFAULT_BLOOM_FILTER_VERSION = 0;
+
+ private static final int BLOCKED_BLOOM_FILTER_VERSION = 1;
private final IBufferCache bufferCache;
private final FileReference file;
@@ -49,7 +57,13 @@
private int numHashes;
private long numElements;
private long numBits;
+ // keep trace of the version of the bloomfilter to be backward compatible
+ private int version;
private final int numBitsPerPage;
+ private final int numBlocksPerPage;
+ private ICachedPage[] pages;
+ private int pinCount = 0;
+ private boolean pagesPinned = false;
private static final byte[] ZERO_BUFFER = new byte[131072]; // 128kb
private static final long SEED = 0L;
@@ -58,6 +72,7 @@
this.file = file;
this.keyFields = keyFields;
this.numBitsPerPage = bufferCache.getPageSize() * Byte.SIZE;
+ this.numBlocksPerPage = this.numBitsPerPage / NUM_BITS_PER_BLOCK;
}
public int getFileId() {
@@ -68,6 +83,30 @@
return file;
}
+ public synchronized void pinAllPages() throws HyracksDataException {
+ if (pinCount == 0) {
+ // first time pin
+ if (pages == null) {
+ pages = new ICachedPage[numPages];
+ }
+ for (int i = 0; i < numPages; i++) {
+ pages[i] = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, i + 1), false);
+ }
+ pagesPinned = true;
+ }
+ pinCount++;
+ }
+
+ public synchronized void unpinAllPages() throws HyracksDataException {
+ if (pinCount == 1) {
+ for (int i = 0; i < numPages; i++) {
+ bufferCache.unpin(pages[i]);
+ }
+ pagesPinned = false;
+ }
+ pinCount--;
+ }
+
public int getNumPages() throws HyracksDataException {
if (!isActivated) {
activate();
@@ -87,6 +126,51 @@
return false;
}
MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
+ if (version == BLOCKED_BLOOM_FILTER_VERSION) {
+ return blockContains(hashes);
+ } else {
+ return legacyContains(hashes);
+ }
+ }
+
+ private boolean blockContains(long[] hashes) throws HyracksDataException {
+ // take first hash to compute block id
+ long hash = Math.abs(hashes[0] % numBits);
+ long blockId = hash / NUM_BITS_PER_BLOCK;
+ int pageId = (int) (blockId / numBlocksPerPage);
+ long groupStartIndex = (blockId % numBlocksPerPage) * NUM_BITS_PER_BLOCK;
+
+ boolean unpinWhenExit = false;
+ ICachedPage page = null;
+ if (pagesPinned) {
+ page = pages[pageId];
+ } else {
+ page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId + 1), false);
+ unpinWhenExit = true;
+ }
+ ByteBuffer buffer = page.getBuffer();
+
+ try {
+ for (int i = 1; i < numHashes; ++i) {
+ hash = Math.abs((hashes[0] + i * hashes[1]) % NUM_BITS_PER_BLOCK);
+ int byteIndex = (int) ((hash + groupStartIndex) >> 3); // divide 8
+ byte b = buffer.get(byteIndex);
+ int bitIndex = (int) (hash & 0x07); // mod 8
+ if (!((b & (1L << bitIndex)) != 0)) {
+ return false;
+ }
+ }
+ } finally {
+ if (unpinWhenExit) {
+ bufferCache.unpin(page);
+ }
+ }
+ return true;
+
+ }
+
+ // membership check for legacy bloom filters
+ private boolean legacyContains(long[] hashes) throws HyracksDataException {
for (int i = 0; i < numHashes; ++i) {
long hash = Math.abs((hashes[0] + i * hashes[1]) % numBits);
@@ -138,6 +222,7 @@
numHashes = 0;
numElements = 0;
numBits = 0;
+ version = DEFAULT_BLOOM_FILTER_VERSION;
return;
}
ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
@@ -147,6 +232,7 @@
numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
numElements = metaPage.getBuffer().getLong(NUM_ELEMENTS_OFFSET);
numBits = metaPage.getBuffer().getLong(NUM_BITS_OFFSET);
+ version = metaPage.getBuffer().getInt(VERSION_OFFSET);
} finally {
metaPage.releaseReadLatch();
bufferCache.unpin(metaPage);
@@ -157,6 +243,9 @@
if (!isActivated) {
throw HyracksDataException.create(ErrorCode.CANNOT_DEACTIVATE_INACTIVE_BLOOM_FILTER);
}
+ if (pagesPinned) {
+ throw HyracksDataException.create(ErrorCode.CANNOT_DEACTIVATE_PINNED_BLOOM_FILTER);
+ }
bufferCache.closeFile(fileId);
isActivated = false;
}
@@ -240,6 +329,7 @@
metaDataPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
metaDataPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, numElements);
metaDataPage.getBuffer().putLong(NUM_BITS_OFFSET, numBits);
+ metaDataPage.getBuffer().putInt(VERSION_OFFSET, BLOCKED_BLOOM_FILTER_VERSION);
}
@Override
@@ -248,15 +338,21 @@
throw HyracksDataException.create(ErrorCode.CANNOT_ADD_TUPLES_TO_DUMMY_BLOOM_FILTER);
}
MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
- for (int i = 0; i < numHashes; ++i) {
- long hash = Math.abs((hashes[0] + i * hashes[1]) % numBits);
- ICachedPage page = pages[(int) (hash / numBitsPerPage)];
- ByteBuffer buffer = page.getBuffer();
- int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
- byte b = buffer.get(byteIndex);
- int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
- b = (byte) (b | (1 << bitIndex));
+ long hash = Math.abs(hashes[0] % numBits);
+ long groupId = hash / NUM_BITS_PER_BLOCK;
+ int pageId = (int) (groupId / numBlocksPerPage);
+ long groupStartIndex = (groupId % numBlocksPerPage) * NUM_BITS_PER_BLOCK;
+
+ ICachedPage page = pages[pageId];
+ ByteBuffer buffer = page.getBuffer();
+
+ for (int i = 1; i < numHashes; ++i) {
+ hash = Math.abs((hashes[0] + i * hashes[1]) % NUM_BITS_PER_BLOCK);
+ int byteIndex = (int) ((hash + groupStartIndex) >> 3); // divide 8
+ byte b = buffer.get(byteIndex);
+ int bitIndex = (int) (hash & 0x07); // mod 8
+ b = (byte) (b | (1 << bitIndex));
buffer.put(byteIndex, b);
}
}
@@ -273,6 +369,7 @@
BloomFilter.this.numHashes = numHashes;
BloomFilter.this.numElements = numElements;
BloomFilter.this.numPages = numPages;
+ BloomFilter.this.version = BLOCKED_BLOOM_FILTER_VERSION;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/org/apache/hyracks/storage/am/bloomfilter/BloomFilterTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/org/apache/hyracks/storage/am/bloomfilter/BloomFilterTest.java
index c6652c3..f720d6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/org/apache/hyracks/storage/am/bloomfilter/BloomFilterTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/org/apache/hyracks/storage/am/bloomfilter/BloomFilterTest.java
@@ -97,6 +97,7 @@
}
builder.end();
+ bf.pinAllPages();
// Check all the inserted tuples can be found.
long[] hashes = BloomFilter.createHashArray();
@@ -104,7 +105,7 @@
TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
Assert.assertTrue(bf.contains(tuple, hashes));
}
-
+ bf.unpinAllPages();
bf.deactivate();
bf.destroy();
}
@@ -157,12 +158,14 @@
}
builder.end();
+ bf.pinAllPages();
long[] hashes = BloomFilter.createHashArray();
for (int i = 0; i < numElements; ++i) {
TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1.get(i), s2.get(i), i, s3.get(i), s4.get(i));
Assert.assertTrue(bf.contains(tuple, hashes));
}
+ bf.unpinAllPages();
bf.deactivate();
bf.destroy();
}