merged hyracks_lsm_tree into branch -r2702:2707
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree_bloom_filter@2708 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-bloomfilter/pom.xml b/hyracks-storage-am-bloomfilter/pom.xml
new file mode 100644
index 0000000..dab96f9
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/pom.xml
@@ -0,0 +1,42 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-bloomfilter</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
new file mode 100644
index 0000000..69010f3
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class BloomFilter {
+
+ private final static int METADATA_PAGE_ID = 0;
+ private final static int NUM_PAGES_OFFSET = 0; // 0
+ private final static int NUM_HASHES_USED_OFFSET = NUM_PAGES_OFFSET + 4; // 4
+ private final static int NUM_ELEMENTS_OFFSET = NUM_HASHES_USED_OFFSET + 4; // 8
+ private final static int NUM_BITS_OFFSET = NUM_ELEMENTS_OFFSET + 8; // 12
+
+ private final static int NUM_BITS_PER_ELEMENT = 10;
+
+ private final IBufferCache bufferCache;
+ private final IFileMapProvider fileMapProvider;
+ private final FileReference file;
+ private final int[] keyFields;
+ private int fileId = -1;
+ private boolean isActivated = false;
+
+ private int numPages;
+ private int numHashes;
+ private long numElements;
+ private long numBits;
+ private int numBitsPerPage;
+
+ private final ArrayList<ICachedPage> bloomFilterPages = new ArrayList<ICachedPage>();
+ private final static long SEED = 0L;
+
+ public BloomFilter(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file, int[] keyFields)
+ throws HyracksDataException {
+ this.bufferCache = bufferCache;
+ this.fileMapProvider = fileMapProvider;
+ this.file = file;
+ this.keyFields = keyFields;
+ }
+
+ public int getFileId() {
+ return fileId;
+ }
+
+ public FileReference getFileReference() {
+ return file;
+ }
+
+ public int getNumPages() throws HyracksDataException {
+ if (!isActivated) {
+ throw new HyracksDataException("The bloom filter is not activated.");
+ }
+ return numPages;
+ }
+
+ public long getNumElements() throws HyracksDataException {
+ if (!isActivated) {
+ throw new HyracksDataException("The bloom filter is not activated.");
+ }
+ return numElements;
+ }
+
+ public void add(ITupleReference tuple, long[] hashes) {
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
+ for (int i = 0; i < numHashes; ++i) {
+ long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
+
+ ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
+ int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
+ byte b = buffer.get(byteIndex);
+ int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
+ b = (byte) (b | (1 << bitIndex));
+
+ buffer.put(byteIndex, b);
+ }
+ }
+
+ public boolean contains(ITupleReference tuple, long[] hashes) {
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
+ for (int i = 0; i < numHashes; ++i) {
+ long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
+
+ ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
+ int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
+ byte b = buffer.get(byteIndex);
+ int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
+
+ if (!((b & (1L << bitIndex)) != 0)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void prepareFile() throws HyracksDataException {
+ boolean fileIsMapped = false;
+ synchronized (fileMapProvider) {
+ fileIsMapped = fileMapProvider.isMapped(file);
+ if (!fileIsMapped) {
+ bufferCache.createFile(file);
+ }
+ fileId = fileMapProvider.lookupFileId(file);
+ try {
+ // Also creates the file if it doesn't exist yet.
+ bufferCache.openFile(fileId);
+ } catch (HyracksDataException e) {
+ // Revert state of buffer cache since file failed to open.
+ if (!fileIsMapped) {
+ bufferCache.deleteFile(fileId, false);
+ }
+ throw e;
+ }
+ }
+ }
+
+ private void persistBloomFilterMetaData() throws HyracksDataException {
+ ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
+ metaPage.acquireWriteLatch();
+ metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, numPages);
+ metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
+ metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, numElements);
+ metaPage.getBuffer().putLong(NUM_BITS_OFFSET, numBits);
+ metaPage.releaseWriteLatch();
+ bufferCache.unpin(metaPage);
+ }
+
+ private void readBloomFilterMetaData() throws HyracksDataException {
+ ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
+ metaPage.acquireReadLatch();
+ numPages = metaPage.getBuffer().getInt(NUM_PAGES_OFFSET);
+ numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
+ numElements = metaPage.getBuffer().getLong(NUM_ELEMENTS_OFFSET);
+ numBits = metaPage.getBuffer().getLong(NUM_BITS_OFFSET);
+ metaPage.releaseReadLatch();
+ bufferCache.unpin(metaPage);
+ }
+
+ public synchronized void create(long numElements, int numHashes) throws HyracksDataException {
+ if (isActivated) {
+ throw new HyracksDataException("Failed to create the bloom filter since it is activated.");
+ }
+ this.numElements = numElements;
+ this.numHashes = numHashes;
+ numBitsPerPage = bufferCache.getPageSize() * Byte.SIZE;
+ numBits = numElements * NUM_BITS_PER_ELEMENT;
+ long tmp = (long) Math.ceil(numBits / (double) numBitsPerPage);
+ if (tmp > Integer.MAX_VALUE) {
+ throw new HyracksDataException("Cannot create a bloom filter with his huge number of pages.");
+ }
+ numPages = (int) tmp;
+
+ prepareFile();
+ persistBloomFilterMetaData();
+ bufferCache.closeFile(fileId);
+ }
+
+ public synchronized void activate() throws HyracksDataException {
+ if (isActivated) {
+ return;
+ }
+
+ prepareFile();
+ readBloomFilterMetaData();
+
+ int currentPageId = 1;
+ while (currentPageId <= numPages) {
+ ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false);
+ page.acquireWriteLatch();
+ bloomFilterPages.add(page);
+ ++currentPageId;
+ }
+ isActivated = true;
+ }
+
+ public synchronized void deactivate() throws HyracksDataException {
+ if (!isActivated) {
+ return;
+ }
+
+ if (fileId == 1) {
+ System.out.println();
+ }
+ for (int i = 0; i < numPages; ++i) {
+ ICachedPage page = bloomFilterPages.get(i);
+ page.releaseWriteLatch();
+ bufferCache.unpin(page);
+ }
+ bloomFilterPages.clear();
+ bufferCache.closeFile(fileId);
+ isActivated = false;
+ }
+
+ public synchronized void destroy() throws HyracksDataException {
+ if (isActivated) {
+ throw new HyracksDataException("Failed to destroy the bloom filter since it is activated.");
+ }
+
+ file.delete();
+ if (fileId == -1) {
+ return;
+ }
+ bufferCache.deleteFile(fileId, false);
+ fileId = -1;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterFactory.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterFactory.java
new file mode 100644
index 0000000..8bd419a
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class BloomFilterFactory {
+ private final IBufferCache bufferCache;
+ private final IFileMapProvider fileMapProvider;
+ private final int[] keyFields;
+
+ public BloomFilterFactory(IBufferCache bufferCache, IFileMapProvider fileMapProvider, int[] keyFields) {
+ this.bufferCache = bufferCache;
+ this.fileMapProvider = fileMapProvider;
+ this.keyFields = keyFields;
+ }
+
+ public BloomFilter createBloomFiltertInstance(FileReference file) throws HyracksDataException {
+ return new BloomFilter(bufferCache, fileMapProvider, file, keyFields);
+ }
+}
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterSpecification.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterSpecification.java
new file mode 100644
index 0000000..c396f9b
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterSpecification.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+public final class BloomFilterSpecification {
+ private final long numElements;
+ private final int numHashes;
+
+ public BloomFilterSpecification(long numElements, int numHashes) {
+ this.numElements = numElements;
+ this.numHashes = numHashes;
+ }
+
+ public long getNumElements() {
+ return numElements;
+ }
+
+ public int getNumHashes() {
+ return numHashes;
+ }
+}
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/MurmurHash128Bit.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/MurmurHash128Bit.java
new file mode 100644
index 0000000..b1ec77d
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/MurmurHash128Bit.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * The idea of this class is borrowed from http://murmurhash.googlepages.com/ and cassandra source code.3
+ * We changed the hash function to operate on ITupleReference instead of a byte array.
+ **/
+public class MurmurHash128Bit {
+
+ private final static int DUMMY_FIELD = 0;
+
+ public static long rotl64(long v, int n) {
+ return ((v << n) | (v >>> (64 - n)));
+ }
+
+ public static long fmix(long k) {
+ k ^= k >>> 33;
+ k *= 0xff51afd7ed558ccdL;
+ k ^= k >>> 33;
+ k *= 0xc4ceb9fe1a85ec53L;
+ k ^= k >>> 33;
+
+ return k;
+ }
+
+ public static void hash3_x64_128(ITupleReference tuple, int[] keyFields, long seed, long[] hashes) {
+ int length = 0;
+ for (int i = 0; i < keyFields.length; ++i) {
+ length += tuple.getFieldLength(keyFields[i]);
+ }
+ final int nblocks = length >> 4; // Process as 128-bit blocks.
+
+ long h1 = seed;
+ long h2 = seed;
+
+ long c1 = 0x87c37b91114253d5L;
+ long c2 = 0x4cf5ad432745937fL;
+
+ //----------
+ // body
+
+ int currentFieldIndex = 0;
+ int bytePos = 0;
+ for (int i = 0; i < nblocks; ++i) {
+
+ long k1 = 0L;
+ for (int j = 0; j < 8; ++j) {
+ k1 += (((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos] & 0xff) << (j << 3));
+ ++bytePos;
+ if (tuple.getFieldLength(keyFields[currentFieldIndex]) == bytePos) {
+ ++currentFieldIndex;
+ bytePos = 0;
+ }
+ }
+ long k2 = 0L;
+ for (int j = 0; j < 8; ++j) {
+ k2 += (((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos] & 0xff) << (j << 3));
+ ++bytePos;
+ if (tuple.getFieldLength(keyFields[currentFieldIndex]) == bytePos) {
+ ++currentFieldIndex;
+ bytePos = 0;
+ }
+ }
+
+ k1 *= c1;
+ k1 = rotl64(k1, 31);
+ k1 *= c2;
+ h1 ^= k1;
+
+ h1 = rotl64(h1, 27);
+ h1 += h2;
+ h1 = h1 * 5 + 0x52dce729;
+
+ k2 *= c2;
+ k2 = rotl64(k2, 33);
+ k2 *= c1;
+ h2 ^= k2;
+
+ h2 = rotl64(h2, 31);
+ h2 += h1;
+ h2 = h2 * 5 + 0x38495ab5;
+ }
+
+ //----------
+ // tail
+
+ long k1 = 0L;
+ long k2 = 0L;
+
+ currentFieldIndex = keyFields.length - 1;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ switch (length & 15) {
+ case 15:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 48;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 14:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 40;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 13:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 32;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 12:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 24;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 11:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 16;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 10:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 8;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 9:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]);
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ k2 *= c2;
+ k2 = rotl64(k2, 33);
+ k2 *= c1;
+ h2 ^= k2;
+
+ case 8:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 56;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 7:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 48;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 6:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 40;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 5:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 32;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 4:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 24;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 3:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 16;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 2:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 8;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 1:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]);
+ k1 *= c1;
+ k1 = rotl64(k1, 31);
+ k1 *= c2;
+ h1 ^= k1;
+ };
+
+ //----------
+ // finalization
+
+ h1 ^= length;
+ h2 ^= length;
+
+ h1 += h2;
+ h2 += h1;
+
+ h1 = fmix(h1);
+ h2 = fmix(h2);
+
+ h1 += h2;
+ h2 += h1;
+
+ hashes[0] = h1;
+ hashes[1] = h2;
+ }
+
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 9bf4a4f..370e094 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -152,6 +152,9 @@
return;
}
+ if (fileId == 0) {
+ System.out.println();
+ }
bufferCache.closeFile(fileId);
freePageManager.close();
diff --git a/hyracks-storage-am-lsm-btree/pom.xml b/hyracks-storage-am-lsm-btree/pom.xml
index 17f3714..afef819 100644
--- a/hyracks-storage-am-lsm-btree/pom.xml
+++ b/hyracks-storage-am-lsm-btree/pom.xml
@@ -33,6 +33,13 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-bloomfilter</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-common</artifactId>
<version>0.2.2-SNAPSHOT</version>
<type>jar</type>
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 4cd4974..6d76a86 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -23,8 +23,12 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -64,7 +68,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
@@ -86,13 +89,16 @@
private final ITreeIndexFrameFactory deleteLeafFrameFactory;
private final IBinaryComparatorFactory[] cmpFactories;
+ private final int numHashes = 10;
+
public LSMBTree(IInMemoryBufferCache memBufferCache, IInMemoryFreePageManager memFreePageManager,
ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMIndexFileManager fileManager,
TreeIndexFactory<BTree> diskBTreeFactory, TreeIndexFactory<BTree> bulkLoadBTreeFactory,
- IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
- ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
- ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
+ BloomFilterFactory bloomFilterFactory, IFileMapProvider diskFileMapProvider, int fieldCount,
+ IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy,
+ ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
+ ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
super(memFreePageManager, diskBTreeFactory.getBufferCache(), fileManager, diskFileMapProvider, mergePolicy,
opTrackerFactory, ioScheduler, ioOpCallbackProvider);
mutableComponent = new LSMBTreeMutableComponent(new BTree(memBufferCache,
@@ -102,8 +108,8 @@
this.insertLeafFrameFactory = insertLeafFrameFactory;
this.deleteLeafFrameFactory = deleteLeafFrameFactory;
this.cmpFactories = cmpFactories;
- componentFactory = new LSMBTreeImmutableComponentFactory(diskBTreeFactory);
- bulkLoadComponentFactory = new LSMBTreeImmutableComponentFactory(bulkLoadBTreeFactory);
+ componentFactory = new LSMBTreeImmutableComponentFactory(diskBTreeFactory, bloomFilterFactory);
+ bulkLoadComponentFactory = new LSMBTreeImmutableComponentFactory(bulkLoadBTreeFactory, bloomFilterFactory);
}
@Override
@@ -135,14 +141,15 @@
throw new HyracksDataException(e);
}
for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
- LSMBTreeImmutableComponent btree;
+ LSMBTreeImmutableComponent component;
try {
- btree = createDiskComponent(componentFactory, lsmComonentFileReference.getInsertIndexFileReference(),
- false);
+ component = createDiskComponent(componentFactory,
+ lsmComonentFileReference.getInsertIndexFileReference(),
+ lsmComonentFileReference.getBloomFilterFileReference(), 0L, 0, false);
} catch (IndexException e) {
throw new HyracksDataException(e);
}
- immutableComponents.add(btree);
+ immutableComponents.add(component);
}
isActivated = true;
}
@@ -167,8 +174,11 @@
List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
+ LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+ BTree btree = component.getBTree();
+ BloomFilter bloomFilter = component.getBloomFilter();
btree.deactivate();
+ bloomFilter.deactivate();
}
mutableComponent.getBTree().deactivate();
mutableComponent.getBTree().destroy();
@@ -189,8 +199,9 @@
List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
- btree.destroy();
+ LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+ component.getBTree().destroy();
+ component.getBloomFilter().destroy();
}
mutableComponent.getBTree().destroy();
fileManager.deleteDirs();
@@ -205,9 +216,11 @@
List<ILSMComponent> immutableComponents = componentsRef.get();
mutableComponent.getBTree().clear();
for (ILSMComponent c : immutableComponents) {
- BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
- btree.deactivate();
- btree.destroy();
+ LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+ component.getBloomFilter().deactivate();
+ component.getBTree().deactivate();
+ component.getBloomFilter().destroy();
+ component.getBTree().destroy();
}
immutableComponents.clear();
}
@@ -346,24 +359,44 @@
opCtx.setOperation(IndexOperation.FLUSH);
opCtx.getComponentHolder().add(flushingComponent);
ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
- ioScheduler.scheduleOperation(new LSMFlushOperation(flushAccessor, flushingComponent, componentFileRefs
- .getInsertIndexFileReference(), callback));
+ ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent, componentFileRefs
+ .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback));
}
@Override
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
- LSMFlushOperation flushOp = (LSMFlushOperation) operation;
+ LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation;
LSMBTreeMutableComponent flushingComponent = (LSMBTreeMutableComponent) flushOp.getFlushingComponent();
IIndexAccessor accessor = flushingComponent.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- IIndexCursor scanCursor = accessor.createSearchCursor();
+
RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
- accessor.search(scanCursor, nullPred);
- LSMBTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getFlushTarget(), true);
+ IIndexCursor countingCursor = ((BTreeAccessor) accessor).createCountingSearchCursor();
+ accessor.search(countingCursor, nullPred);
+ long numElements = 0L;
+ try {
+ while (countingCursor.hasNext()) {
+ countingCursor.next();
+ ITupleReference countTuple = countingCursor.getTuple();
+ numElements = IntegerSerializerDeserializer.getInt(countTuple.getFieldData(0),
+ countTuple.getFieldStart(0));
+ }
+ } finally {
+ countingCursor.close();
+ }
+
+ LSMBTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getBTreeFlushTarget(),
+ flushOp.getBloomFilterFlushTarget(), numElements, numHashes, true);
IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false);
+ BloomFilter bf = component.getBloomFilter();
+ long[] hashes = new long[2];
+
+ IIndexCursor scanCursor = accessor.createSearchCursor();
+ accessor.search(scanCursor, nullPred);
try {
while (scanCursor.hasNext()) {
scanCursor.next();
+ bf.add(scanCursor.getTuple(), hashes);
bulkLoader.add(scanCursor.getTuple());
}
} finally {
@@ -392,7 +425,7 @@
.getName(), lastFile.getFile().getName());
ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
- .getInsertIndexFileReference(), callback));
+ .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
}
@Override
@@ -401,12 +434,24 @@
LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
ITreeIndexCursor cursor = mergeOp.getCursor();
mergedComponents.addAll(mergeOp.getMergingComponents());
- LSMBTreeImmutableComponent mergedBTree = createDiskComponent(componentFactory, mergeOp.getMergeTarget(), true);
+
+ long numElements = 0L;
+ for (int i = 0; i < mergedComponents.size(); ++i) {
+ numElements += ((LSMBTreeImmutableComponent) mergedComponents.get(i)).getBloomFilter().getNumElements();
+ }
+
+ LSMBTreeImmutableComponent mergedBTree = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
+ mergeOp.getBloomFilterMergeTarget(), numElements, numHashes, true);
+
+ BloomFilter bf = mergedBTree.getBloomFilter();
+ long[] hashes = new long[2];
+
IIndexBulkLoader bulkLoader = mergedBTree.getBTree().createBulkLoader(1.0f, false);
try {
while (cursor.hasNext()) {
cursor.next();
ITupleReference frameTuple = cursor.getTuple();
+ bf.add(frameTuple, hashes);
bulkLoader.add(frameTuple);
}
} finally {
@@ -417,15 +462,18 @@
}
private LSMBTreeImmutableComponent createDiskComponent(LSMBTreeImmutableComponentFactory factory,
- FileReference fileRef, boolean createComponent) throws HyracksDataException, IndexException {
+ FileReference btreeFileRef, FileReference bloomFilterFileRef, long numElements, int numHashes,
+ boolean createComponent) throws HyracksDataException, IndexException {
// Create new BTree instance.
LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) factory
- .createLSMComponentInstance(new LSMComponentFileReferences(fileRef, null));
+ .createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
if (createComponent) {
component.getBTree().create();
+ component.getBloomFilter().create(numElements, numHashes);
}
// BTree will be closed during cleanup of merge().
component.getBTree().activate();
+ component.getBloomFilter().activate();
return component;
}
@@ -434,25 +482,33 @@
return new LSMBTreeBulkLoader(fillLevel, verifyInput);
}
- private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
- LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
- return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(), true);
- }
-
@Override
public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
- BTree btree = ((LSMBTreeImmutableComponent) lsmComponent).getBTree();
- forceFlushDirtyPages(btree);
- markAsValidInternal(btree);
+ // The order of forcing the dirty page to be flushed is critical. The bloom filter must be always done first.
+ LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) lsmComponent;
+ int fileId = component.getBloomFilter().getFileId();
+ IBufferCache bufferCache = component.getBTree().getBufferCache();
+ int startPage = 0;
+ int maxPage = component.getBloomFilter().getNumPages();
+ forceFlushDirtyPages(bufferCache, fileId, startPage, maxPage);
+ forceFlushDirtyPages(component.getBTree());
+ markAsValidInternal(component.getBTree());
}
public class LSMBTreeBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final BTreeBulkLoader bulkLoader;
+ private long numElements;
public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
try {
- component = createBulkLoadTarget();
+ numElements = 0L;
+ LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
+ component = (LSMBTreeImmutableComponent) bulkLoadComponentFactory
+ .createLSMComponentInstance(new LSMComponentFileReferences(componentFileRefs
+ .getInsertIndexFileReference(), null, componentFileRefs.getBloomFilterFileReference()));
+ ((LSMBTreeImmutableComponent) component).getBTree().create();
+ ((LSMBTreeImmutableComponent) component).getBTree().activate();
} catch (HyracksDataException e) {
throw new TreeIndexException(e);
} catch (IndexException e) {
@@ -466,6 +522,7 @@
public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
try {
bulkLoader.add(tuple);
+ ++numElements;
} catch (IndexException e) {
handleException();
throw e;
@@ -486,6 +543,25 @@
@Override
public void end() throws HyracksDataException, IndexException {
bulkLoader.end();
+ IIndexAccessor accessor = ((LSMBTreeImmutableComponent) component).getBTree().createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ IIndexCursor scanCursor = accessor.createSearchCursor();
+ RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
+ accessor.search(scanCursor, nullPred);
+ long[] hashes = new long[2];
+ int fileid1 = ((LSMBTreeImmutableComponent) component).getBTree().getFileId();
+ int fileid2 = ((LSMBTreeImmutableComponent) component).getBloomFilter().getFileId();
+ BloomFilter bf = ((LSMBTreeImmutableComponent) component).getBloomFilter();
+ bf.create(numElements, numHashes);
+ bf.activate();
+ try {
+ while (scanCursor.hasNext()) {
+ scanCursor.next();
+ bf.add(scanCursor.getTuple(), hashes);
+ }
+ } finally {
+ scanCursor.close();
+ }
lsmHarness.addBulkLoadedComponent(component);
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
new file mode 100644
index 0000000..61d956d
--- /dev/null
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class LSMBTreeFileManager extends AbstractLSMIndexFileManager {
+ private static final String BTREE_STRING = "b";
+ private static final String BLOOM_FILTER_STRING = "f";
+
+ private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
+
+ public LSMBTreeFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
+ TreeIndexFactory<? extends ITreeIndex> btreeFactory, int startIODeviceIndex) {
+ super(ioManager, fileMapProvider, file, null, startIODeviceIndex);
+ this.btreeFactory = btreeFactory;
+ }
+
+ protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
+ TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
+ throws HyracksDataException, IndexException {
+ File dir = new File(dev.getPath(), baseDir);
+ String[] files = dir.list(filter);
+ for (String fileName : files) {
+ File file = new File(dir.getPath() + File.separator + fileName);
+ FileReference fileRef = new FileReference(file);
+ if (treeFactory == null || isValidTreeIndex(treeFactory.createIndexInstance(fileRef))) {
+ allFiles.add(new ComparableFileName(fileRef));
+ } else {
+ file.delete();
+ }
+ }
+ }
+
+ @Override
+ public LSMComponentFileReferences getRelFlushFileReference() {
+ Date date = new Date();
+ String ts = formatter.format(date);
+ String baseName = baseDir + ts + SPLIT_STRING + ts;
+ // Begin timestamp and end timestamp are identical since it is a flush
+ return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + BTREE_STRING), null,
+ createFlushFile(baseName + SPLIT_STRING + BLOOM_FILTER_STRING));
+ }
+
+ @Override
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
+ throws HyracksDataException {
+ String[] firstTimestampRange = firstFileName.split(SPLIT_STRING);
+ String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
+
+ String baseName = baseDir + firstTimestampRange[0] + SPLIT_STRING + lastTimestampRange[1];
+ // Get the range of timestamps by taking the earliest and the latest timestamps
+ return new LSMComponentFileReferences(createMergeFile(baseName + SPLIT_STRING + BTREE_STRING), null,
+ createMergeFile(baseName + SPLIT_STRING + BLOOM_FILTER_STRING));
+ }
+
+ private static FilenameFilter btreeFilter = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return !name.startsWith(".") && name.endsWith(BTREE_STRING);
+ }
+ };
+
+ private static FilenameFilter bloomFilterFilter = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return !name.startsWith(".") && name.endsWith(BLOOM_FILTER_STRING);
+ }
+ };
+
+ @Override
+ public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException, IndexException {
+ List<LSMComponentFileReferences> validFiles = new ArrayList<LSMComponentFileReferences>();
+ ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<ComparableFileName>();
+ ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<ComparableFileName>();
+
+ // Gather files from all IODeviceHandles.
+ for (IODeviceHandle dev : ioManager.getIODevices()) {
+ cleanupAndGetValidFilesInternal(dev, bloomFilterFilter, null, allBloomFilterFiles);
+ HashSet<String> bloomFilterFilesSet = new HashSet<String>();
+ for (ComparableFileName cmpFileName : allBloomFilterFiles) {
+ int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+ bloomFilterFilesSet.add(cmpFileName.fileName.substring(0, index));
+ }
+ // List of valid BTree files that may or may not have a bloom filter buddy. Will check for buddies below.
+ ArrayList<ComparableFileName> tmpAllBTreeFiles = new ArrayList<ComparableFileName>();
+ cleanupAndGetValidFilesInternal(dev, btreeFilter, btreeFactory, tmpAllBTreeFiles);
+ // Look for buddy bloom filters for all valid BTrees.
+ // If no buddy is found, delete the file, otherwise add the BTree to allBTreeFiles.
+ for (ComparableFileName cmpFileName : tmpAllBTreeFiles) {
+ int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+ String file = cmpFileName.fileName.substring(0, index);
+ if (bloomFilterFilesSet.contains(file)) {
+ allBTreeFiles.add(cmpFileName);
+ } else {
+ // Couldn't find the corresponding bloom filter file; thus, delete
+ // the BTree file.
+ File invalidBTreeFile = new File(cmpFileName.fullPath);
+ invalidBTreeFile.delete();
+ }
+ }
+ }
+ // Sanity check.
+ if (allBTreeFiles.size() != allBloomFilterFiles.size()) {
+ throw new HyracksDataException(
+ "Unequal number of valid BTree and bloom filter files found. Aborting cleanup.");
+ }
+
+ // Trivial cases.
+ if (allBTreeFiles.isEmpty() || allBloomFilterFiles.isEmpty()) {
+ return validFiles;
+ }
+
+ if (allBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) {
+ validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef,
+ allBloomFilterFiles.get(0).fileRef, null));
+ return validFiles;
+ }
+
+ // Sorts files names from earliest to latest timestamp.
+ Collections.sort(allBTreeFiles);
+ Collections.sort(allBloomFilterFiles);
+
+ List<ComparableFileName> validComparableBTreeFiles = new ArrayList<ComparableFileName>();
+ ComparableFileName lastBTree = allBTreeFiles.get(0);
+ validComparableBTreeFiles.add(lastBTree);
+
+ List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<ComparableFileName>();
+ ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+ validComparableBloomFilterFiles.add(lastBloomFilter);
+
+ for (int i = 1; i < allBTreeFiles.size(); i++) {
+ ComparableFileName currentBTree = allBTreeFiles.get(i);
+ ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
+ // Current start timestamp is greater than last stop timestamp.
+ if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
+ && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+ validComparableBTreeFiles.add(currentBTree);
+ validComparableBloomFilterFiles.add(currentBloomFilter);
+ lastBTree = currentBTree;
+ lastBloomFilter = currentBloomFilter;
+ } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
+ && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
+ && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
+ && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
+ // Invalid files are completely contained in last interval.
+ File invalidBTreeFile = new File(currentBTree.fullPath);
+ invalidBTreeFile.delete();
+ File invalidBloomFilterFile = new File(currentBloomFilter.fullPath);
+ invalidBloomFilterFile.delete();
+ } else {
+ // This scenario should not be possible.
+ throw new HyracksDataException("Found LSM files with overlapping but not contained timetamp intervals.");
+ }
+ }
+
+ // Sort valid files in reverse lexicographical order, such that newer
+ // files come first.
+ Collections.sort(validComparableBTreeFiles, recencyCmp);
+ Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+
+ Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
+ Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+ while (btreeFileIter.hasNext() && bloomFilterFileIter.hasNext()) {
+ ComparableFileName cmpBTreeFileName = btreeFileIter.next();
+ ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
+ validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, null,
+ cmpBloomFilterFileName.fileRef));
+ }
+
+ return validFiles;
+ }
+}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
new file mode 100644
index 0000000..dfda07b
--- /dev/null
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -0,0 +1,71 @@
+package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
+
+public class LSMBTreeFlushOperation implements ILSMIOOperation {
+
+ private final ILSMIndexAccessorInternal accessor;
+ private final ILSMComponent flushingComponent;
+ private final FileReference btreeFlushTarget;
+ private final FileReference bloomFilterFlushTarget;
+ private final ILSMIOOperationCallback callback;
+
+ public LSMBTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
+ FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+ this.accessor = accessor;
+ this.flushingComponent = flushingComponent;
+ this.btreeFlushTarget = btreeFlushTarget;
+ this.bloomFilterFlushTarget = bloomFilterFlushTarget;
+ this.callback = callback;
+ }
+
+ @Override
+ public Set<IODeviceHandle> getReadDevices() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<IODeviceHandle> getWriteDevices() {
+ Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
+ devs.add(btreeFlushTarget.getDeviceHandle());
+ devs.add(bloomFilterFlushTarget.getDeviceHandle());
+ return devs;
+ }
+
+ @Override
+ public void perform() throws HyracksDataException, IndexException {
+ accessor.flush(this);
+ }
+
+ @Override
+ public ILSMIOOperationCallback getCallback() {
+ return callback;
+ }
+
+ public FileReference getBTreeFlushTarget() {
+ return btreeFlushTarget;
+ }
+
+ public FileReference getBloomFilterFlushTarget() {
+ return bloomFilterFlushTarget;
+ }
+
+ public ILSMIndexAccessorInternal getAccessor() {
+ return accessor;
+ }
+
+ public ILSMComponent getFlushingComponent() {
+ return flushingComponent;
+ }
+}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
index 2251a49..7b5e95d 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
@@ -1,24 +1,33 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractImmutableLSMComponent;
public class LSMBTreeImmutableComponent extends AbstractImmutableLSMComponent {
private final BTree btree;
+ private final BloomFilter bloomFilter;
- public LSMBTreeImmutableComponent(BTree btree) {
+ public LSMBTreeImmutableComponent(BTree btree, BloomFilter bloomFilter) {
this.btree = btree;
+ this.bloomFilter = bloomFilter;
}
@Override
public void destroy() throws HyracksDataException {
btree.deactivate();
btree.destroy();
+ bloomFilter.deactivate();
+ bloomFilter.destroy();
}
public BTree getBTree() {
return btree;
}
+
+ public BloomFilter getBloomFilter() {
+ return bloomFilter;
+ }
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
index 696fc2c..998072f 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -25,14 +27,18 @@
public class LSMBTreeImmutableComponentFactory implements ILSMComponentFactory {
private final TreeIndexFactory<BTree> btreeFactory;
+ private final BloomFilterFactory bloomFilterFactory;
- public LSMBTreeImmutableComponentFactory(TreeIndexFactory<BTree> btreeFactory) {
+ public LSMBTreeImmutableComponentFactory(TreeIndexFactory<BTree> btreeFactory, BloomFilterFactory bloomFilterFactory) {
this.btreeFactory = btreeFactory;
+ this.bloomFilterFactory = bloomFilterFactory;
}
@Override
- public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException {
- return new LSMBTreeImmutableComponent(btreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()));
+ public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException,
+ HyracksDataException {
+ return new LSMBTreeImmutableComponent(btreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()),
+ bloomFilterFactory.createBloomFiltertInstance(cfr.getBloomFilterFileReference()));
}
@Override
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index a3a7097..ddfe365 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -35,15 +34,18 @@
private final ILSMIndexAccessorInternal accessor;
private final List<ILSMComponent> mergingComponents;
private final ITreeIndexCursor cursor;
- private final FileReference mergeTarget;
+ private final FileReference btreeMergeTarget;
+ private final FileReference bloomFilterMergeTarget;
private final ILSMIOOperationCallback callback;
public LSMBTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
- ITreeIndexCursor cursor, FileReference mergeTarget, ILSMIOOperationCallback callback) {
+ ITreeIndexCursor cursor, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget,
+ ILSMIOOperationCallback callback) {
this.accessor = accessor;
this.mergingComponents = mergingComponents;
this.cursor = cursor;
- this.mergeTarget = mergeTarget;
+ this.btreeMergeTarget = btreeMergeTarget;
+ this.bloomFilterMergeTarget = bloomFilterMergeTarget;
this.callback = callback;
}
@@ -59,7 +61,10 @@
@Override
public Set<IODeviceHandle> getWriteDevices() {
- return Collections.singleton(mergeTarget.getDeviceHandle());
+ Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
+ devs.add(btreeMergeTarget.getDeviceHandle());
+ devs.add(bloomFilterMergeTarget.getDeviceHandle());
+ return devs;
}
@Override
@@ -72,8 +77,12 @@
return callback;
}
- public FileReference getMergeTarget() {
- return mergeTarget;
+ public FileReference getBTreeMergeTarget() {
+ return btreeMergeTarget;
+ }
+
+ public FileReference getBloomFilterMergeTarget() {
+ return bloomFilterMergeTarget;
}
public ITreeIndexCursor getCursor() {
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
index 3706230..154ecf6 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
@@ -29,6 +30,7 @@
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.freepage.LinkedListFreePageManagerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeCopyTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
@@ -38,7 +40,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -80,13 +81,20 @@
typeTraits.length);
TreeIndexFactory<BTree> bulkLoadBTreeFactory = new BTreeFactory(diskBufferCache, diskFileMapProvider,
freePageManagerFactory, interiorFrameFactory, insertLeafFrameFactory, cmpFactories, typeTraits.length);
- ILSMIndexFileManager fileNameManager = new LSMIndexFileManager(ioManager, diskFileMapProvider, file,
+
+ int keyFields[] = new int[cmpFactories.length];
+ for (int i = 0; i < cmpFactories.length; i++) {
+ keyFields[i] = i;
+ }
+ BloomFilterFactory bloomFilterFactory = new BloomFilterFactory(diskBufferCache, diskFileMapProvider, keyFields);
+
+ ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, diskFileMapProvider, file,
diskBTreeFactory, startIODeviceIndex);
LSMBTree lsmTree = new LSMBTree(memBufferCache, memFreePageManager, interiorFrameFactory,
insertLeafFrameFactory, deleteLeafFrameFactory, fileNameManager, diskBTreeFactory,
- bulkLoadBTreeFactory, diskFileMapProvider, typeTraits.length, cmpFactories, mergePolicy,
- opTrackerFactory, ioScheduler, ioOpCallbackProvider);
+ bulkLoadBTreeFactory, bloomFilterFactory, diskFileMapProvider, typeTraits.length, cmpFactories,
+ mergePolicy, opTrackerFactory, ioScheduler, ioOpCallbackProvider);
return lsmTree;
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java
index d00b805..1f3a2b7 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java
@@ -1,11 +1,13 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
public interface ILSMComponentFactory {
- public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException;
+ public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException,
+ HyracksDataException;
public IBufferCache getBufferCache();
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 3d2ebcd..e2bd77c 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -48,7 +48,6 @@
protected final IFileMapProvider fileMapProvider;
// baseDir should reflect dataset name and partition name.
- protected FileReference file;
protected String baseDir;
protected final Format formatter = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
protected final Comparator<String> cmp = new FileNameComparator();
@@ -61,7 +60,6 @@
public AbstractLSMIndexFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
TreeIndexFactory<? extends ITreeIndex> treeFactory, int startIODeviceIndex) {
- this.file = file;
this.baseDir = file.getFile().getPath();
if (!baseDir.endsWith(System.getProperty("file.separator"))) {
baseDir += System.getProperty("file.separator");
@@ -100,9 +98,21 @@
}
}
- abstract protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
+ protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
- throws HyracksDataException, IndexException;
+ throws HyracksDataException, IndexException {
+ File dir = new File(dev.getPath(), baseDir);
+ String[] files = dir.list(filter);
+ for (String fileName : files) {
+ File file = new File(dir.getPath() + File.separator + fileName);
+ FileReference fileRef = new FileReference(file);
+ if (isValidTreeIndex(treeFactory.createIndexInstance(fileRef))) {
+ allFiles.add(new ComparableFileName(fileRef));
+ } else {
+ file.delete();
+ }
+ }
+ }
@Override
public void createDirs() {
@@ -145,7 +155,7 @@
Date date = new Date();
String ts = formatter.format(date);
// Begin timestamp and end timestamp are identical since it is a flush
- return new LSMComponentFileReferences(createFlushFile(baseDir + ts + SPLIT_STRING + ts), null);
+ return new LSMComponentFileReferences(createFlushFile(baseDir + ts + SPLIT_STRING + ts), null, null);
}
@Override
@@ -155,7 +165,7 @@
String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
// Get the range of timestamps by taking the earliest and the latest timestamps
return new LSMComponentFileReferences(createMergeFile(baseDir + firstTimestampRange[0] + SPLIT_STRING
- + lastTimestampRange[1]), null);
+ + lastTimestampRange[1]), null, null);
}
@Override
@@ -177,7 +187,7 @@
}
if (allFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null));
+ validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null, null));
return validFiles;
}
@@ -209,7 +219,7 @@
// Sort valid files in reverse lexicographical order, such that newer files come first.
Collections.sort(validComparableFiles, recencyCmp);
for (ComparableFileName cmpFileName : validComparableFiles) {
- validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null));
+ validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null, null));
}
return validFiles;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
index ac6ddcf..019dca4 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
@@ -24,9 +24,14 @@
// This FileReference for the delete index (if any). For example, this will be the the FileReference of the buddy BTree in one component of the LSM-RTree.
private final FileReference deleteIndexFileReference;
- public LSMComponentFileReferences(FileReference insertIndexFileReference, FileReference deleteIndexFileReference) {
+ // This FileReference for the bloom filter (if any).
+ private final FileReference bloomFilterFileReference;
+
+ public LSMComponentFileReferences(FileReference insertIndexFileReference, FileReference deleteIndexFileReference,
+ FileReference bloomFilterFileReference) {
this.insertIndexFileReference = insertIndexFileReference;
this.deleteIndexFileReference = deleteIndexFileReference;
+ this.bloomFilterFileReference = bloomFilterFileReference;
}
public FileReference getInsertIndexFileReference() {
@@ -36,4 +41,8 @@
public FileReference getDeleteIndexFileReference() {
return deleteIndexFileReference;
}
+
+ public FileReference getBloomFilterFileReference() {
+ return bloomFilterFileReference;
+ }
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
deleted file mode 100644
index 6ce1f08..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.util.Collections;
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
-
-public class LSMFlushOperation implements ILSMIOOperation {
-
- private final ILSMIndexAccessorInternal accessor;
- private final ILSMComponent flushingComponent;
- private final FileReference flushTarget;
- private final ILSMIOOperationCallback callback;
-
- public LSMFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
- FileReference flushTarget, ILSMIOOperationCallback callback) {
- this.accessor = accessor;
- this.flushingComponent = flushingComponent;
- this.flushTarget = flushTarget;
- this.callback = callback;
- }
-
- @Override
- public Set<IODeviceHandle> getReadDevices() {
- return Collections.emptySet();
- }
-
- @Override
- public Set<IODeviceHandle> getWriteDevices() {
- return Collections.singleton(flushTarget.getDeviceHandle());
- }
-
- @Override
- public void perform() throws HyracksDataException, IndexException {
- accessor.flush(this);
- }
-
- @Override
- public ILSMIOOperationCallback getCallback() {
- return callback;
- }
-
- public FileReference getFlushTarget() {
- return flushTarget;
- }
-
- public ILSMIndexAccessorInternal getAccessor() {
- return accessor;
- }
-
- public ILSMComponent getFlushingComponent() {
- return flushingComponent;
- }
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexFileManager.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexFileManager.java
deleted file mode 100644
index a7a16ac..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexFileManager.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.ArrayList;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-
-public class LSMIndexFileManager extends AbstractLSMIndexFileManager {
-
- public LSMIndexFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
- TreeIndexFactory<? extends ITreeIndex> treeFactory, int startIODeviceIndex) {
- super(ioManager, fileMapProvider, file, treeFactory, startIODeviceIndex);
- }
-
- protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
- TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
- throws HyracksDataException, IndexException {
- File dir = new File(dev.getPath(), baseDir);
- String[] files = dir.list(filter);
- for (String fileName : files) {
- File file = new File(dir.getPath() + File.separator + fileName);
- FileReference fileRef = new FileReference(file);
- if (isValidTreeIndex(treeFactory.createIndexInstance(fileRef))) {
- allFiles.add(new ComparableFileName(fileRef));
- } else {
- file.delete();
- }
- }
- }
-}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index ced1aa9..ca0e091f 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -574,7 +574,7 @@
FileReference dictBTreeFileRef, FileReference btreeFileRef, boolean create) throws HyracksDataException,
IndexException {
LSMInvertedIndexImmutableComponent component = (LSMInvertedIndexImmutableComponent) factory
- .createLSMComponentInstance(new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef));
+ .createLSMComponentInstance(new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef, null));
if (create) {
component.getInvIndex().create();
component.getDeletedKeysBTree().create();
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 8ffb0bd..21a11dc 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -29,14 +29,14 @@
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
// TODO: Refactor for better code sharing with other file managers.
-public class LSMInvertedIndexFileManager extends LSMIndexFileManager implements IInvertedIndexFileNameMapper {
+public class LSMInvertedIndexFileManager extends AbstractLSMIndexFileManager implements IInvertedIndexFileNameMapper {
private static final String DICT_BTREE_SUFFIX = "b";
private static final String INVLISTS_SUFFIX = "i";
private static final String DELETED_KEYS_BTREE_SUFFIX = "d";
@@ -69,7 +69,7 @@
String baseName = baseDir + ts + SPLIT_STRING + ts;
// Begin timestamp and end timestamp are identical since it is a flush
return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + DICT_BTREE_SUFFIX),
- createFlushFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX));
+ createFlushFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX), null);
}
@Override
@@ -81,7 +81,7 @@
String baseName = baseDir + firstTimestampRange[0] + SPLIT_STRING + lastTimestampRange[1];
// Get the range of timestamps by taking the earliest and the latest timestamps
return new LSMComponentFileReferences(createMergeFile(baseName + SPLIT_STRING + DICT_BTREE_SUFFIX),
- createMergeFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX));
+ createMergeFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX), null);
}
@Override
@@ -132,7 +132,7 @@
if (allDictBTreeFiles.size() == 1 && allDeletedKeysBTreeFiles.size() == 1) {
validFiles.add(new LSMComponentFileReferences(allDictBTreeFiles.get(0).fileRef, allDeletedKeysBTreeFiles
- .get(0).fileRef));
+ .get(0).fileRef, null));
return validFiles;
}
@@ -183,7 +183,8 @@
while (dictBTreeFileIter.hasNext() && deletedKeysBTreeIter.hasNext()) {
ComparableFileName cmpDictBTreeFile = dictBTreeFileIter.next();
ComparableFileName cmpDeletedKeysBTreeFile = deletedKeysBTreeIter.next();
- validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.fileRef, cmpDeletedKeysBTreeFile.fileRef));
+ validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.fileRef, cmpDeletedKeysBTreeFile.fileRef,
+ null));
}
return validFiles;
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 6cafbbb..cacac12 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -228,7 +228,7 @@
FileReference deleteFileRef, boolean createComponent) throws HyracksDataException, IndexException {
// Create new tree instance.
LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) factory
- .createLSMComponentInstance(new LSMComponentFileReferences(insertFileRef, deleteFileRef));
+ .createLSMComponentInstance(new LSMComponentFileReferences(insertFileRef, deleteFileRef, null));
if (createComponent) {
component.getRTree().create();
if (component.getBTree() != null) {
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 39cd5d6..8ce3dc9 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -207,7 +207,7 @@
@Override
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation;
- LSMRTreeMutableComponent flushingComponent = flushOp.getFlushingComponent();
+ LSMRTreeMutableComponent flushingComponent = (LSMRTreeMutableComponent) flushOp.getFlushingComponent();
// Renaming order is critical because we use assume ordering when we
// read the file names when we open the tree.
// The RTree should be renamed before the BTree.
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index 7ceecad..041fda1 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -30,12 +30,12 @@
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-public class LSMRTreeFileManager extends LSMIndexFileManager {
+public class LSMRTreeFileManager extends AbstractLSMIndexFileManager {
private static final String RTREE_STRING = "r";
private static final String BTREE_STRING = "b";
@@ -69,7 +69,7 @@
String baseName = baseDir + ts + SPLIT_STRING + ts;
// Begin timestamp and end timestamp are identical since it is a flush
return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + RTREE_STRING),
- createFlushFile(baseName + SPLIT_STRING + BTREE_STRING));
+ createFlushFile(baseName + SPLIT_STRING + BTREE_STRING), null);
}
@Override
@@ -81,7 +81,7 @@
String baseName = baseDir + firstTimestampRange[0] + SPLIT_STRING + lastTimestampRange[1];
// Get the range of timestamps by taking the earliest and the latest timestamps
return new LSMComponentFileReferences(createMergeFile(baseName + SPLIT_STRING + RTREE_STRING),
- createMergeFile(baseName + SPLIT_STRING + BTREE_STRING));
+ createMergeFile(baseName + SPLIT_STRING + BTREE_STRING), null);
}
@Override
@@ -127,7 +127,8 @@
}
if (allRTreeFiles.size() == 1 && allBTreeFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).fileRef, allBTreeFiles.get(0).fileRef));
+ validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).fileRef, allBTreeFiles.get(0).fileRef,
+ null));
return validFiles;
}
@@ -178,7 +179,7 @@
while (rtreeFileIter.hasNext() && btreeFileIter.hasNext()) {
ComparableFileName cmpRTreeFileName = rtreeFileIter.next();
ComparableFileName cmpBTreeFileName = btreeFileIter.next();
- validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.fileRef, cmpBTreeFileName.fileRef));
+ validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.fileRef, cmpBTreeFileName.fileRef, null));
}
return validFiles;
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index 8698a1d..8c5c6fe 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -8,6 +8,7 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
@@ -15,12 +16,12 @@
public class LSMRTreeFlushOperation implements ILSMIOOperation {
private final ILSMIndexAccessorInternal accessor;
- private final LSMRTreeMutableComponent flushingComponent;
+ private final ILSMComponent flushingComponent;
private final FileReference rtreeFlushTarget;
private final FileReference btreeFlushTarget;
private final ILSMIOOperationCallback callback;
- public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, LSMRTreeMutableComponent flushingComponent,
+ public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
FileReference rtreeFlushTarget, FileReference btreeFlushTarget, ILSMIOOperationCallback callback) {
this.accessor = accessor;
this.flushingComponent = flushingComponent;
@@ -38,7 +39,9 @@
public Set<IODeviceHandle> getWriteDevices() {
Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
devs.add(rtreeFlushTarget.getDeviceHandle());
- devs.add(btreeFlushTarget.getDeviceHandle());
+ if (btreeFlushTarget != null) {
+ devs.add(btreeFlushTarget.getDeviceHandle());
+ }
return devs;
}
@@ -60,7 +63,7 @@
return btreeFlushTarget;
}
- public LSMRTreeMutableComponent getFlushingComponent() {
+ public ILSMComponent getFlushingComponent() {
return flushingComponent;
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index 970b253..d6a8693 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -1,6 +1,5 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -40,14 +39,21 @@
for (ILSMComponent o : mergingComponents) {
LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) o;
devs.add(component.getRTree().getFileReference().getDeviceHandle());
- devs.add(component.getBTree().getFileReference().getDeviceHandle());
+ if (component.getBTree() != null) {
+ devs.add(component.getBTree().getFileReference().getDeviceHandle());
+ }
}
return devs;
}
@Override
public Set<IODeviceHandle> getWriteDevices() {
- return Collections.singleton(rtreeMergeTarget.getDeviceHandle());
+ Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
+ devs.add(rtreeMergeTarget.getDeviceHandle());
+ if (btreeMergeTarget != null) {
+ devs.add(btreeMergeTarget.getDeviceHandle());
+ }
+ return devs;
}
@Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 47df7df..6c8fe88 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -52,7 +52,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
@@ -205,13 +204,13 @@
opCtx.setOperation(IndexOperation.FLUSH);
opCtx.getComponentHolder().add(flushingComponent);
ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, opCtx);
- ioScheduler.scheduleOperation(new LSMFlushOperation(accessor, flushingComponent, relFlushFileRefs
- .getInsertIndexFileReference(), callback));
+ ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, relFlushFileRefs
+ .getInsertIndexFileReference(), null, callback));
}
@Override
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
- LSMFlushOperation flushOp = (LSMFlushOperation) operation;
+ LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation;
// Renaming order is critical because we use assume ordering when we
// read the file names when we open the tree.
// The RTree should be renamed before the BTree.
@@ -221,8 +220,8 @@
RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor();
SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
- LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getFlushTarget(), null,
- true);
+ LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getRTreeFlushTarget(),
+ null, true);
RTree diskRTree = component.getRTree();
// scan the memory BTree
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
new file mode 100644
index 0000000..10b982f
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class LSMRTreeWithAntiMatterTuplesFileManager extends AbstractLSMIndexFileManager {
+
+ private final TreeIndexFactory<? extends ITreeIndex> rtreeFactory;
+
+ public LSMRTreeWithAntiMatterTuplesFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider,
+ FileReference file, TreeIndexFactory<? extends ITreeIndex> rtreeFactory, int startIODeviceIndex) {
+ super(ioManager, fileMapProvider, file, null, startIODeviceIndex);
+ this.rtreeFactory = rtreeFactory;
+ }
+
+ @Override
+ public LSMComponentFileReferences getRelFlushFileReference() {
+ Date date = new Date();
+ String ts = formatter.format(date);
+ // Begin timestamp and end timestamp are identical since it is a flush
+ return new LSMComponentFileReferences(createFlushFile(baseDir + ts + SPLIT_STRING + ts), null, null);
+ }
+
+ @Override
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
+ throws HyracksDataException {
+ String[] firstTimestampRange = firstFileName.split(SPLIT_STRING);
+ String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
+ // Get the range of timestamps by taking the earliest and the latest timestamps
+ return new LSMComponentFileReferences(createMergeFile(baseDir + firstTimestampRange[0] + SPLIT_STRING
+ + lastTimestampRange[1]), null, null);
+ }
+
+ private static FilenameFilter fileNameFilter = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return !name.startsWith(".");
+ }
+ };
+
+ @Override
+ public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException, IndexException {
+ List<LSMComponentFileReferences> validFiles = new ArrayList<LSMComponentFileReferences>();
+ ArrayList<ComparableFileName> allFiles = new ArrayList<ComparableFileName>();
+
+ // Gather files from all IODeviceHandles and delete invalid files
+ // There are two types of invalid files:
+ // (1) The isValid flag is not set
+ // (2) The file's interval is contained by some other file
+ // Here, we only filter out (1).
+ for (IODeviceHandle dev : ioManager.getIODevices()) {
+ cleanupAndGetValidFilesInternal(dev, fileNameFilter, rtreeFactory, allFiles);
+ }
+
+ if (allFiles.isEmpty()) {
+ return validFiles;
+ }
+
+ if (allFiles.size() == 1) {
+ validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null, null));
+ return validFiles;
+ }
+
+ // Sorts files names from earliest to latest timestamp.
+ Collections.sort(allFiles);
+
+ List<ComparableFileName> validComparableFiles = new ArrayList<ComparableFileName>();
+ ComparableFileName last = allFiles.get(0);
+ validComparableFiles.add(last);
+ for (int i = 1; i < allFiles.size(); i++) {
+ ComparableFileName current = allFiles.get(i);
+ // The current start timestamp is greater than last stop timestamp so current is valid.
+ if (current.interval[0].compareTo(last.interval[1]) > 0) {
+ validComparableFiles.add(current);
+ last = current;
+ } else if (current.interval[0].compareTo(last.interval[0]) >= 0
+ && current.interval[1].compareTo(last.interval[1]) <= 0) {
+ // The current file is completely contained in the interval of the
+ // last file. Thus the last file must contain at least as much information
+ // as the current file, so delete the current file.
+ current.fileRef.delete();
+ } else {
+ // This scenario should not be possible since timestamps are monotonically increasing.
+ throw new HyracksDataException("Found LSM files with overlapping timestamp intervals, "
+ + "but the intervals were not contained by another file.");
+ }
+ }
+
+ // Sort valid files in reverse lexicographical order, such that newer files come first.
+ Collections.sort(validComparableFiles, recencyCmp);
+ for (ComparableFileName cmpFileName : validComparableFiles) {
+ validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null, null));
+ }
+
+ return validFiles;
+ }
+}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
index e4d2e82..b3a4ab0 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
@@ -39,11 +39,11 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuples;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuplesFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.RTreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.tuples.LSMRTreeCopyTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.tuples.LSMRTreeTupleWriterFactory;
@@ -172,8 +172,8 @@
IBinaryComparatorFactory[] linearizerArray = { linearizerCmpFactory,
btreeCmpFactories[btreeCmpFactories.length - 1] };
- ILSMIndexFileManager fileNameManager = new LSMIndexFileManager(ioManager, diskFileMapProvider, file,
- diskRTreeFactory, startIODeviceIndex);
+ ILSMIndexFileManager fileNameManager = new LSMRTreeWithAntiMatterTuplesFileManager(ioManager,
+ diskFileMapProvider, file, diskRTreeFactory, startIODeviceIndex);
LSMRTreeWithAntiMatterTuples lsmTree = new LSMRTreeWithAntiMatterTuples(memBufferCache, memFreePageManager,
rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory,
fileNameManager, diskRTreeFactory, bulkLoadRTreeFactory, diskFileMapProvider, typeTraits.length,
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
index 3998108..f962200 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
@@ -91,6 +91,15 @@
public static final int LSM_INVINDEX_SCAN_COUNT_ARRAY_SIZE = 1000000;
public static final int LSM_INVINDEX_MULTITHREAD_NUM_OPERATIONS = 200;
+ // Test params for BloomFilter
+ public static final int BLOOM_FILTER_NUM_TUPLES_TO_INSERT = 100;
+
+ // Mem configuration for BloomFilter.
+ public static final int BLOOM_FILTER_PAGE_SIZE = 256;
+ public static final int BLOOM_FILTER_NUM_PAGES = 1000;
+ public static final int BLOOM_FILTER_MAX_OPEN_FILES = 10;
+ public static final int BLOOM_FILTER_HYRACKS_FRAME_SIZE = 128;
+
}
/* ORIGINAL TEST PARAMETERS: DO NOT EDIT!
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/pom.xml b/hyracks-tests/hyracks-storage-am-bloomfilter-test/pom.xml
new file mode 100644
index 0000000..3b15677
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/pom.xml
@@ -0,0 +1,49 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-bloomfilter-test</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-tests</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-bloomfilter</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-test-support</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
new file mode 100644
index 0000000..e31131b
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.logging.Level;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.util.AbstractBloomFilterTest;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+
+@SuppressWarnings("rawtypes")
+public class BloomFilterTest extends AbstractBloomFilterTest {
+ private final Random rnd = new Random(50);
+
+ @Before
+ public void setUp() throws HyracksDataException {
+ super.setUp();
+ }
+
+ @Test
+ public void basicTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING BLOOM FILTER");
+ }
+
+ IBufferCache bufferCache = harness.getBufferCache();
+
+ int numElements = 100;
+ int[] keyFields = { 0 };
+ int numHashes = 5;
+
+ BloomFilter bf = new BloomFilter(bufferCache, harness.getFileMapProvider(), harness.getFileReference(),
+ keyFields);
+
+ bf.create(numElements, numHashes);
+ bf.activate();
+
+ int fieldCount = 2;
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+
+ // generate keys
+ int maxKey = 1000;
+ TreeSet<Integer> uniqueKeys = new TreeSet<Integer>();
+ ArrayList<Integer> keys = new ArrayList<Integer>();
+ while (uniqueKeys.size() < numElements) {
+ int key = rnd.nextInt() % maxKey;
+ uniqueKeys.add(key);
+ }
+ for (Integer i : uniqueKeys) {
+ keys.add(i);
+ }
+
+ long[] hashes = new long[2];
+ // Check against an empty bloom filter
+ for (int i = 0; i < keys.size(); ++i) {
+ TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
+ Assert.assertFalse(bf.contains(tuple, hashes));
+ }
+
+ // Check all the inserted tuples can be found
+ for (int i = 0; i < keys.size(); ++i) {
+ TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
+ bf.add(tuple, hashes);
+ Assert.assertTrue(bf.contains(tuple, hashes));
+ }
+
+ // Deactivate the bllom filter
+ bf.deactivate();
+
+ // Activate the bloom filter and check the tuples again
+ bf.activate();
+ for (int i = 0; i < keys.size(); ++i) {
+ TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
+ bf.add(tuple, hashes);
+ Assert.assertTrue(bf.contains(tuple, hashes));
+ }
+
+ bf.deactivate();
+ bf.destroy();
+ }
+
+ @Test
+ public void multiFieldTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING BLOOM FILTER");
+ }
+
+ IBufferCache bufferCache = harness.getBufferCache();
+
+ int numElements = 10000;
+ int[] keyFields = { 2, 4, 1 };
+ int numHashes = 10;
+
+ BloomFilter bf = new BloomFilter(bufferCache, harness.getFileMapProvider(), harness.getFileReference(),
+ keyFields);
+
+ bf.create(numElements, numHashes);
+ bf.activate();
+
+ int fieldCount = 5;
+ ISerializerDeserializer[] fieldSerdes = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+
+ long[] hashes = new long[2];
+ int maxLength = 20;
+ for (int i = 0; i < numElements; ++i) {
+ String s1 = randomString(rnd.nextInt() % maxLength, rnd);
+ String s2 = randomString(rnd.nextInt() % maxLength, rnd);
+ String s3 = randomString(rnd.nextInt() % maxLength, rnd);
+ String s4 = randomString(rnd.nextInt() % maxLength, rnd);
+ TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1, s2, rnd.nextInt(), s3, s4);
+
+ bf.add(tuple, hashes);
+ Assert.assertTrue(bf.contains(tuple, hashes));
+ }
+
+ bf.deactivate();
+ bf.destroy();
+ }
+}
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java
new file mode 100644
index 0000000..284a6cb
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java
@@ -0,0 +1,296 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.logging.Level;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.util.AbstractBloomFilterTest;
+
+@SuppressWarnings("rawtypes")
+public class MurmurHashForITupleReferenceTest extends AbstractBloomFilterTest {
+ private final static int NUM_LONG_VARS_FOR_128_BIT_HASH = 2;
+ private final static int DUMMY_FIELD = 0;
+ private final Random rnd = new Random(50);
+
+ @Before
+ public void setUp() throws HyracksDataException {
+ super.setUp();
+ }
+
+ @Test
+ public void murmurhashONEIntegerFieldTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING MURMUR HASH ONE INTEGER FIELD");
+ }
+
+ int fieldCount = 2;
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ TupleUtils.createIntegerTuple(tupleBuilder, tuple, rnd.nextInt());
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+
+ int keyFields[] = { 0 };
+ int length = getTupleSize(tuple, keyFields);
+
+ long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+ ByteBuffer buffer;
+ byte[] array = new byte[length];
+ fillArrayWithData(array, keyFields, tuple, length);
+ buffer = ByteBuffer.wrap(array);
+
+ long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+ Assert.assertArrayEquals(expecteds, actuals);
+ }
+
+ @Test
+ public void murmurhashTwoIntegerFieldsTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING MURMUR HASH TWO INTEGER FIELDS");
+ }
+
+ int fieldCount = 2;
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ TupleUtils.createIntegerTuple(tupleBuilder, tuple, rnd.nextInt(), rnd.nextInt());
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+
+ int keyFields[] = { 0, 1 };
+ int length = getTupleSize(tuple, keyFields);
+
+ long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+ ByteBuffer buffer;
+ byte[] array = new byte[length];
+ fillArrayWithData(array, keyFields, tuple, length);
+ buffer = ByteBuffer.wrap(array);
+
+ long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+ Assert.assertArrayEquals(expecteds, actuals);
+ }
+
+ @Test
+ public void murmurhashOneStringFieldTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING MURMUR HASH ONE STRING FIELD");
+ }
+
+ int fieldCount = 2;
+ ISerializerDeserializer[] fieldSerdes = { UTF8StringSerializerDeserializer.INSTANCE };
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ String s = randomString(100, rnd);
+ TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s);
+
+ int keyFields[] = { 0 };
+ int length = getTupleSize(tuple, keyFields);
+
+ long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+ byte[] array = new byte[length];
+ ByteBuffer buffer;
+ fillArrayWithData(array, keyFields, tuple, length);
+ buffer = ByteBuffer.wrap(array);
+
+ long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+ Assert.assertArrayEquals(expecteds, actuals);
+ }
+
+ @Test
+ public void murmurhashThreeStringFieldsTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING MURMUR HASH THREE STRING FIELDS");
+ }
+
+ int fieldCount = 3;
+ ISerializerDeserializer[] fieldSerdes = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ String s1 = randomString(40, rnd);
+ String s2 = randomString(60, rnd);
+ String s3 = randomString(20, rnd);
+ TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1, s2, s3);
+
+ int keyFields[] = { 2, 0, 1 };
+ int length = getTupleSize(tuple, keyFields);
+
+ long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+ byte[] array = new byte[length];
+ ByteBuffer buffer;
+ fillArrayWithData(array, keyFields, tuple, length);
+ buffer = ByteBuffer.wrap(array);
+
+ long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+ Assert.assertArrayEquals(expecteds, actuals);
+ }
+
+ private void fillArrayWithData(byte[] array, int[] keyFields, ITupleReference tuple, int length) {
+ int currentFieldIndex = 0;
+ int bytePos = 0;
+ for (int i = 0; i < length; ++i) {
+ array[i] = tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex]) + bytePos];
+ ++bytePos;
+ if (tuple.getFieldLength(keyFields[currentFieldIndex]) == bytePos) {
+ ++currentFieldIndex;
+ bytePos = 0;
+ }
+ }
+ }
+
+ private int getTupleSize(ITupleReference tuple, int[] keyFields) {
+ int length = 0;
+ for (int i = 0; i < keyFields.length; ++i) {
+ length += tuple.getFieldLength(keyFields[i]);
+ }
+ return length;
+ }
+
+ /**
+ * The hash3_x64_128 and getblock functions are borrowed from cassandra source code for testing purpose
+ **/
+ protected static long getblock(ByteBuffer key, int offset, int index) {
+ int i_8 = index << 3;
+ int blockOffset = offset + i_8;
+ return ((long) key.get(blockOffset + 0) & 0xff) + (((long) key.get(blockOffset + 1) & 0xff) << 8)
+ + (((long) key.get(blockOffset + 2) & 0xff) << 16) + (((long) key.get(blockOffset + 3) & 0xff) << 24)
+ + (((long) key.get(blockOffset + 4) & 0xff) << 32) + (((long) key.get(blockOffset + 5) & 0xff) << 40)
+ + (((long) key.get(blockOffset + 6) & 0xff) << 48) + (((long) key.get(blockOffset + 7) & 0xff) << 56);
+ }
+
+ public static long[] hash3_x64_128(ByteBuffer key, int offset, int length, long seed) {
+ final int nblocks = length >> 4; // Process as 128-bit blocks.
+
+ long h1 = seed;
+ long h2 = seed;
+
+ long c1 = 0x87c37b91114253d5L;
+ long c2 = 0x4cf5ad432745937fL;
+
+ //----------
+ // body
+
+ for (int i = 0; i < nblocks; i++) {
+ long k1 = getblock(key, offset, i * 2 + 0);
+ long k2 = getblock(key, offset, i * 2 + 1);
+
+ k1 *= c1;
+ k1 = MurmurHash128Bit.rotl64(k1, 31);
+ k1 *= c2;
+ h1 ^= k1;
+
+ h1 = MurmurHash128Bit.rotl64(h1, 27);
+ h1 += h2;
+ h1 = h1 * 5 + 0x52dce729;
+
+ k2 *= c2;
+ k2 = MurmurHash128Bit.rotl64(k2, 33);
+ k2 *= c1;
+ h2 ^= k2;
+
+ h2 = MurmurHash128Bit.rotl64(h2, 31);
+ h2 += h1;
+ h2 = h2 * 5 + 0x38495ab5;
+ }
+
+ //----------
+ // tail
+
+ // Advance offset to the unprocessed tail of the data.
+ offset += nblocks * 16;
+
+ long k1 = 0;
+ long k2 = 0;
+
+ switch (length & 15) {
+ case 15:
+ k2 ^= ((long) key.get(offset + 14)) << 48;
+ case 14:
+ k2 ^= ((long) key.get(offset + 13)) << 40;
+ case 13:
+ k2 ^= ((long) key.get(offset + 12)) << 32;
+ case 12:
+ k2 ^= ((long) key.get(offset + 11)) << 24;
+ case 11:
+ k2 ^= ((long) key.get(offset + 10)) << 16;
+ case 10:
+ k2 ^= ((long) key.get(offset + 9)) << 8;
+ case 9:
+ k2 ^= ((long) key.get(offset + 8)) << 0;
+ k2 *= c2;
+ k2 = MurmurHash128Bit.rotl64(k2, 33);
+ k2 *= c1;
+ h2 ^= k2;
+
+ case 8:
+ k1 ^= ((long) key.get(offset + 7)) << 56;
+ case 7:
+ k1 ^= ((long) key.get(offset + 6)) << 48;
+ case 6:
+ k1 ^= ((long) key.get(offset + 5)) << 40;
+ case 5:
+ k1 ^= ((long) key.get(offset + 4)) << 32;
+ case 4:
+ k1 ^= ((long) key.get(offset + 3)) << 24;
+ case 3:
+ k1 ^= ((long) key.get(offset + 2)) << 16;
+ case 2:
+ k1 ^= ((long) key.get(offset + 1)) << 8;
+ case 1:
+ k1 ^= ((long) key.get(offset));
+ k1 *= c1;
+ k1 = MurmurHash128Bit.rotl64(k1, 31);
+ k1 *= c2;
+ h1 ^= k1;
+ };
+
+ //----------
+ // finalization
+
+ h1 ^= length;
+ h2 ^= length;
+
+ h1 += h2;
+ h2 += h1;
+
+ h1 = MurmurHash128Bit.fmix(h1);
+ h2 = MurmurHash128Bit.fmix(h2);
+
+ h1 += h2;
+ h2 += h1;
+
+ return (new long[] { h1, h2 });
+ }
+}
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java
new file mode 100644
index 0000000..9b857a6
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.util;
+
+import java.util.Random;
+import java.util.logging.Logger;
+
+import org.junit.After;
+import org.junit.Before;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractBloomFilterTest {
+ protected final Logger LOGGER = Logger.getLogger(BloomFilterTestHarness.class.getName());
+
+ protected final BloomFilterTestHarness harness;
+
+ public AbstractBloomFilterTest() {
+ harness = new BloomFilterTestHarness();
+ }
+
+ public AbstractBloomFilterTest(int pageSize, int numPages, int maxOpenFiles, int hyracksFrameSize) {
+ harness = new BloomFilterTestHarness(pageSize, numPages, maxOpenFiles, hyracksFrameSize);
+ }
+
+ @Before
+ public void setUp() throws HyracksDataException {
+ harness.setUp();
+ }
+
+ @After
+ public void tearDown() throws HyracksDataException {
+ harness.tearDown();
+ }
+
+ public static String randomString(int length, Random random) {
+ char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+ StringBuilder strBuilder = new StringBuilder();
+ for (int i = 0; i < length; ++i) {
+ char c = chars[random.nextInt(chars.length)];
+ strBuilder.append(c);
+ }
+ return strBuilder.toString();
+ }
+}
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.java
new file mode 100644
index 0000000..8fac122
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.util;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
+import edu.uci.ics.hyracks.test.support.TestUtils;
+
+public class BloomFilterTestHarness {
+
+ private static final long RANDOM_SEED = 50;
+
+ protected final int pageSize;
+ protected final int numPages;
+ protected final int maxOpenFiles;
+ protected final int hyracksFrameSize;
+
+ protected IHyracksTaskContext ctx;
+ protected IBufferCache bufferCache;
+ protected IFileMapProvider fileMapProvider;
+ protected FileReference file;
+
+ protected final Random rnd = new Random();
+ protected final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
+ protected final String tmpDir = System.getProperty("java.io.tmpdir");
+ protected final String sep = System.getProperty("file.separator");
+ protected String fileName;
+
+ public BloomFilterTestHarness() {
+ this.pageSize = AccessMethodTestsConfig.BLOOM_FILTER_PAGE_SIZE;
+ this.numPages = AccessMethodTestsConfig.BLOOM_FILTER_NUM_PAGES;
+ this.maxOpenFiles = AccessMethodTestsConfig.BLOOM_FILTER_MAX_OPEN_FILES;
+ this.hyracksFrameSize = AccessMethodTestsConfig.BLOOM_FILTER_HYRACKS_FRAME_SIZE;
+ }
+
+ public BloomFilterTestHarness(int pageSize, int numPages, int maxOpenFiles, int hyracksFrameSize) {
+ this.pageSize = pageSize;
+ this.numPages = numPages;
+ this.maxOpenFiles = maxOpenFiles;
+ this.hyracksFrameSize = hyracksFrameSize;
+ }
+
+ public void setUp() throws HyracksDataException {
+ fileName = tmpDir + sep + simpleDateFormat.format(new Date());
+ ctx = TestUtils.create(getHyracksFrameSize());
+ TestStorageManagerComponentHolder.init(pageSize, numPages, maxOpenFiles);
+ bufferCache = TestStorageManagerComponentHolder.getBufferCache(ctx);
+ fileMapProvider = TestStorageManagerComponentHolder.getFileMapProvider(ctx);
+ file = new FileReference(new File(fileName));
+ rnd.setSeed(RANDOM_SEED);
+ }
+
+ public void tearDown() throws HyracksDataException {
+ bufferCache.close();
+ file.delete();
+ }
+
+ public IHyracksTaskContext getHyracksTaskContext() {
+ return ctx;
+ }
+
+ public IBufferCache getBufferCache() {
+ return bufferCache;
+ }
+
+ public IFileMapProvider getFileMapProvider() {
+ return fileMapProvider;
+ }
+
+ public FileReference getFileReference() {
+ return file;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public Random getRandom() {
+ return rnd;
+ }
+
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ public int getNumPages() {
+ return numPages;
+ }
+
+ public int getHyracksFrameSize() {
+ return hyracksFrameSize;
+ }
+
+ public int getMaxOpenFiles() {
+ return maxOpenFiles;
+ }
+}
diff --git a/hyracks-tests/pom.xml b/hyracks-tests/pom.xml
index b79295a..4011339 100644
--- a/hyracks-tests/pom.xml
+++ b/hyracks-tests/pom.xml
@@ -19,5 +19,6 @@
<module>hyracks-storage-am-lsm-btree-test</module>
<module>hyracks-storage-am-lsm-rtree-test</module>
<module>hyracks-storage-am-lsm-invertedindex-test</module>
+ <module>hyracks-storage-am-bloomfilter-test</module>
</modules>
</project>
diff --git a/pom.xml b/pom.xml
index 49a5887..c642992 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,6 +95,7 @@
<module>hyracks-cli</module>
<module>hyracks-storage-common</module>
<module>hyracks-storage-am-common</module>
+ <module>hyracks-storage-am-bloomfilter</module>
<module>hyracks-storage-am-btree</module>
<module>hyracks-storage-am-lsm-invertedindex</module>
<module>hyracks-storage-am-lsm-common</module>