Merge branch 'zheilbron/hyracks_msr_demo' of https://code.google.com/p/hyracks into zheilbron/hyracks_msr_demo
diff --git a/hyracks/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index 19200ee..fdfa39a 100644
--- a/hyracks/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++ b/hyracks/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -16,7 +16,6 @@
 package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -49,8 +48,6 @@
     private long numBits;
     private final int numBitsPerPage;
     private final static byte[] ZERO_BUFFER = new byte[131072]; // 128kb
-
-    private final ArrayList<ICachedPage> bloomFilterPages = new ArrayList<ICachedPage>();
     private final static long SEED = 0L;
 
     public BloomFilter(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file, int[] keyFields)
@@ -84,19 +81,29 @@
         return numElements;
     }
 
-    public boolean contains(ITupleReference tuple, long[] hashes) {
+    public boolean contains(ITupleReference tuple, long[] hashes) throws HyracksDataException {
         MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
         for (int i = 0; i < numHashes; ++i) {
             long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
 
-            ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
-            int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
-            byte b = buffer.get(byteIndex);
-            int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
+            // we increment the page id by one, since the metadata page id of the filter is 0.
+            ICachedPage page = bufferCache.pin(
+                    BufferedFileHandle.getDiskPageId(fileId, (int) (hash / numBitsPerPage) + 1), false);
+            page.acquireReadLatch();
+            try {
+                ByteBuffer buffer = page.getBuffer();
+                int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
+                byte b = buffer.get(byteIndex);
+                int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
 
-            if (!((b & (1L << bitIndex)) != 0)) {
-                return false;
+                if (!((b & (1L << bitIndex)) != 0)) {
+                    return false;
+                }
+            } finally {
+                page.releaseReadLatch();
+                bufferCache.unpin(page);
             }
+
         }
         return true;
     }
@@ -129,12 +136,15 @@
         prepareFile();
         ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), true);
         metaPage.acquireWriteLatch();
-        metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, 0);
-        metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, 0);
-        metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, 0L);
-        metaPage.getBuffer().putLong(NUM_BITS_OFFSET, 0L);
-        metaPage.releaseWriteLatch(true);
-        bufferCache.unpin(metaPage);
+        try {
+            metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, 0);
+            metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, 0);
+            metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, 0L);
+            metaPage.getBuffer().putLong(NUM_BITS_OFFSET, 0L);
+        } finally {
+            metaPage.releaseWriteLatch(true);
+            bufferCache.unpin(metaPage);
+        }
         bufferCache.closeFile(fileId);
     }
 
@@ -145,36 +155,27 @@
 
         prepareFile();
         readBloomFilterMetaData();
-
-        int currentPageId = 1;
-        while (currentPageId <= numPages) {
-            ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false);
-            bloomFilterPages.add(page);
-            ++currentPageId;
-        }
         isActivated = true;
     }
 
     private void readBloomFilterMetaData() throws HyracksDataException {
         ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
         metaPage.acquireReadLatch();
-        numPages = metaPage.getBuffer().getInt(NUM_PAGES_OFFSET);
-        numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
-        numElements = metaPage.getBuffer().getLong(NUM_ELEMENTS_OFFSET);
-        numBits = metaPage.getBuffer().getLong(NUM_BITS_OFFSET);
-        metaPage.releaseReadLatch();
-        bufferCache.unpin(metaPage);
+        try {
+            numPages = metaPage.getBuffer().getInt(NUM_PAGES_OFFSET);
+            numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
+            numElements = metaPage.getBuffer().getLong(NUM_ELEMENTS_OFFSET);
+            numBits = metaPage.getBuffer().getLong(NUM_BITS_OFFSET);
+        } finally {
+            metaPage.releaseReadLatch();
+            bufferCache.unpin(metaPage);
+        }
     }
 
     public synchronized void deactivate() throws HyracksDataException {
         if (!isActivated) {
             return;
         }
-
-        for (int i = 0; i < numPages; ++i) {
-            bufferCache.unpin(bloomFilterPages.get(i));
-        }
-        bloomFilterPages.clear();
         bufferCache.closeFile(fileId);
         isActivated = false;
     }
@@ -223,8 +224,12 @@
             while (currentPageId <= numPages) {
                 ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), true);
                 page.acquireWriteLatch();
-                initPage(page.getBuffer().array());
-                bloomFilterPages.add(page);
+                try {
+                    initPage(page.getBuffer().array());
+                } finally {
+                    page.releaseWriteLatch(true);
+                    bufferCache.unpin(page);
+                }
                 ++currentPageId;
             }
         }
@@ -245,12 +250,15 @@
         private void persistBloomFilterMetaData() throws HyracksDataException {
             ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
             metaPage.acquireWriteLatch();
-            metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, numPages);
-            metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
-            metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, numElements);
-            metaPage.getBuffer().putLong(NUM_BITS_OFFSET, numBits);
-            metaPage.releaseWriteLatch(true);
-            bufferCache.unpin(metaPage);
+            try {
+                metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, numPages);
+                metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
+                metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, numElements);
+                metaPage.getBuffer().putLong(NUM_BITS_OFFSET, numBits);
+            } finally {
+                metaPage.releaseWriteLatch(true);
+                bufferCache.unpin(metaPage);
+            }
         }
 
         @Override
@@ -259,22 +267,28 @@
             for (int i = 0; i < numHashes; ++i) {
                 long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
 
-                ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
-                int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
-                byte b = buffer.get(byteIndex);
-                int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
-                b = (byte) (b | (1 << bitIndex));
+                // we increment the page id by one, since the metadata page id of the filter is 0.
+                ICachedPage page = bufferCache.pin(
+                        BufferedFileHandle.getDiskPageId(fileId, (int) (hash / numBitsPerPage) + 1), false);
+                page.acquireWriteLatch();
+                try {
+                    ByteBuffer buffer = page.getBuffer();
+                    int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
+                    byte b = buffer.get(byteIndex);
+                    int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
+                    b = (byte) (b | (1 << bitIndex));
 
-                buffer.put(byteIndex, b);
+                    buffer.put(byteIndex, b);
+                } finally {
+                    page.releaseWriteLatch(true);
+                    bufferCache.unpin(page);
+                }
+
             }
         }
 
         @Override
         public void end() throws HyracksDataException, IndexException {
-            for (int i = 0; i < numPages; ++i) {
-                ICachedPage page = bloomFilterPages.get(i);
-                page.releaseWriteLatch(true);
-            }
         }
 
     }