Merge branch 'zheilbron/hyracks_msr_demo' of https://code.google.com/p/hyracks into zheilbron/hyracks_msr_demo
diff --git a/hyracks/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index 19200ee..fdfa39a 100644
--- a/hyracks/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++ b/hyracks/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -16,7 +16,6 @@
package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -49,8 +48,6 @@
private long numBits;
private final int numBitsPerPage;
private final static byte[] ZERO_BUFFER = new byte[131072]; // 128kb
-
- private final ArrayList<ICachedPage> bloomFilterPages = new ArrayList<ICachedPage>();
private final static long SEED = 0L;
public BloomFilter(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file, int[] keyFields)
@@ -84,19 +81,29 @@
return numElements;
}
- public boolean contains(ITupleReference tuple, long[] hashes) {
+ public boolean contains(ITupleReference tuple, long[] hashes) throws HyracksDataException {
MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
for (int i = 0; i < numHashes; ++i) {
long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
- ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
- int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
- byte b = buffer.get(byteIndex);
- int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
+ // we increment the page id by one, since the metadata page id of the filter is 0.
+ ICachedPage page = bufferCache.pin(
+ BufferedFileHandle.getDiskPageId(fileId, (int) (hash / numBitsPerPage) + 1), false);
+ page.acquireReadLatch();
+ try {
+ ByteBuffer buffer = page.getBuffer();
+ int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
+ byte b = buffer.get(byteIndex);
+ int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
- if (!((b & (1L << bitIndex)) != 0)) {
- return false;
+ if (!((b & (1L << bitIndex)) != 0)) {
+ return false;
+ }
+ } finally {
+ page.releaseReadLatch();
+ bufferCache.unpin(page);
}
+
}
return true;
}
@@ -129,12 +136,15 @@
prepareFile();
ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), true);
metaPage.acquireWriteLatch();
- metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, 0);
- metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, 0);
- metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, 0L);
- metaPage.getBuffer().putLong(NUM_BITS_OFFSET, 0L);
- metaPage.releaseWriteLatch(true);
- bufferCache.unpin(metaPage);
+ try {
+ metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, 0);
+ metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, 0);
+ metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, 0L);
+ metaPage.getBuffer().putLong(NUM_BITS_OFFSET, 0L);
+ } finally {
+ metaPage.releaseWriteLatch(true);
+ bufferCache.unpin(metaPage);
+ }
bufferCache.closeFile(fileId);
}
@@ -145,36 +155,27 @@
prepareFile();
readBloomFilterMetaData();
-
- int currentPageId = 1;
- while (currentPageId <= numPages) {
- ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false);
- bloomFilterPages.add(page);
- ++currentPageId;
- }
isActivated = true;
}
private void readBloomFilterMetaData() throws HyracksDataException {
ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
metaPage.acquireReadLatch();
- numPages = metaPage.getBuffer().getInt(NUM_PAGES_OFFSET);
- numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
- numElements = metaPage.getBuffer().getLong(NUM_ELEMENTS_OFFSET);
- numBits = metaPage.getBuffer().getLong(NUM_BITS_OFFSET);
- metaPage.releaseReadLatch();
- bufferCache.unpin(metaPage);
+ try {
+ numPages = metaPage.getBuffer().getInt(NUM_PAGES_OFFSET);
+ numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
+ numElements = metaPage.getBuffer().getLong(NUM_ELEMENTS_OFFSET);
+ numBits = metaPage.getBuffer().getLong(NUM_BITS_OFFSET);
+ } finally {
+ metaPage.releaseReadLatch();
+ bufferCache.unpin(metaPage);
+ }
}
public synchronized void deactivate() throws HyracksDataException {
if (!isActivated) {
return;
}
-
- for (int i = 0; i < numPages; ++i) {
- bufferCache.unpin(bloomFilterPages.get(i));
- }
- bloomFilterPages.clear();
bufferCache.closeFile(fileId);
isActivated = false;
}
@@ -223,8 +224,12 @@
while (currentPageId <= numPages) {
ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), true);
page.acquireWriteLatch();
- initPage(page.getBuffer().array());
- bloomFilterPages.add(page);
+ try {
+ initPage(page.getBuffer().array());
+ } finally {
+ page.releaseWriteLatch(true);
+ bufferCache.unpin(page);
+ }
++currentPageId;
}
}
@@ -245,12 +250,15 @@
private void persistBloomFilterMetaData() throws HyracksDataException {
ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
metaPage.acquireWriteLatch();
- metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, numPages);
- metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
- metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, numElements);
- metaPage.getBuffer().putLong(NUM_BITS_OFFSET, numBits);
- metaPage.releaseWriteLatch(true);
- bufferCache.unpin(metaPage);
+ try {
+ metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, numPages);
+ metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
+ metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, numElements);
+ metaPage.getBuffer().putLong(NUM_BITS_OFFSET, numBits);
+ } finally {
+ metaPage.releaseWriteLatch(true);
+ bufferCache.unpin(metaPage);
+ }
}
@Override
@@ -259,22 +267,28 @@
for (int i = 0; i < numHashes; ++i) {
long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
- ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
- int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
- byte b = buffer.get(byteIndex);
- int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
- b = (byte) (b | (1 << bitIndex));
+ // we increment the page id by one, since the metadata page id of the filter is 0.
+ ICachedPage page = bufferCache.pin(
+ BufferedFileHandle.getDiskPageId(fileId, (int) (hash / numBitsPerPage) + 1), false);
+ page.acquireWriteLatch();
+ try {
+ ByteBuffer buffer = page.getBuffer();
+ int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
+ byte b = buffer.get(byteIndex);
+ int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
+ b = (byte) (b | (1 << bitIndex));
- buffer.put(byteIndex, b);
+ buffer.put(byteIndex, b);
+ } finally {
+ page.releaseWriteLatch(true);
+ bufferCache.unpin(page);
+ }
+
}
}
@Override
public void end() throws HyracksDataException, IndexException {
- for (int i = 0; i < numPages; ++i) {
- ICachedPage page = bloomFilterPages.get(i);
- page.releaseWriteLatch(true);
- }
}
}