merged hyracks_lsm_tree into branch -r2707:2751
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree_bloom_filter@2752 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index e55882a..56d7837 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -152,7 +152,7 @@
IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, btreeSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, 0.7f, false, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ fieldPermutation, 0.7f, false, 1000L, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
// distribute the records from the datagen via hashing to the bulk load
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 3a4e59f..d7480ec 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -144,7 +144,7 @@
IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, btreeSplitProvider, secondaryTypeTraits, comparatorFactories,
- fieldPermutation, 0.7f, false, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ fieldPermutation, 0.7f, false, 1000L, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
// connect the ops
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 739f107..7e69d4d 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -172,7 +172,7 @@
int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
- fieldPermutation, 0.7f, true, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ fieldPermutation, 0.7f, true, 1000L, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -234,7 +234,7 @@
int[] fieldPermutation = { 3, 0 };
TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, fieldPermutation, 0.7f, true, dataflowHelperFactory,
+ secondaryComparatorFactories, fieldPermutation, 0.7f, true, 1000L, dataflowHelperFactory,
NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index 05b7a26..3a77b60 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -199,7 +199,7 @@
int[] fieldPermutation = { 0, 1 };
TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
- primaryComparatorFactories, fieldPermutation, 0.7f, true, btreeDataflowHelperFactory,
+ primaryComparatorFactories, fieldPermutation, 0.7f, true, 1000L, btreeDataflowHelperFactory,
NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
return primaryBtreeBulkLoad;
@@ -273,7 +273,7 @@
private IOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation) {
LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
- spec, fieldPermutation, true, storageManager, btreeFileSplitProvider, lcManagerProvider,
+ spec, fieldPermutation, true, 1000L, storageManager, btreeFileSplitProvider, lcManagerProvider,
tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
tokenizerFactory, invertedIndexDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 428b354..97f6908 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -223,7 +223,7 @@
int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
- fieldPermutation, 0.7f, false, btreeDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ fieldPermutation, 0.7f, false, 1000L, btreeDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBulkLoad, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -279,7 +279,7 @@
int[] fieldPermutation = { 6, 7, 8, 9, 0 };
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, fieldPermutation, 0.7f, false, rtreeDataflowHelperFactory,
+ secondaryComparatorFactories, fieldPermutation, 0.7f, false, 1000L, rtreeDataflowHelperFactory,
NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBulkLoad, NC1_ID);
diff --git a/hyracks-storage-am-bloomfilter/pom.xml b/hyracks-storage-am-bloomfilter/pom.xml
new file mode 100644
index 0000000..dab96f9
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/pom.xml
@@ -0,0 +1,42 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-bloomfilter</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
new file mode 100644
index 0000000..a6a45de
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class BloomFilter {
+
+ private final static int METADATA_PAGE_ID = 0;
+ private final static int NUM_PAGES_OFFSET = 0; // 0
+ private final static int NUM_HASHES_USED_OFFSET = NUM_PAGES_OFFSET + 4; // 4
+ private final static int NUM_ELEMENTS_OFFSET = NUM_HASHES_USED_OFFSET + 4; // 8
+ private final static int NUM_BITS_OFFSET = NUM_ELEMENTS_OFFSET + 8; // 12
+
+ private final static int NUM_BITS_PER_ELEMENT = 10;
+
+ private final IBufferCache bufferCache;
+ private final IFileMapProvider fileMapProvider;
+ private final FileReference file;
+ private final int[] keyFields;
+ private int fileId = -1;
+ private boolean isActivated = false;
+
+ private int numPages;
+ private int numHashes;
+ private long numElements;
+ private long numBits;
+ private int numBitsPerPage;
+
+ private final ArrayList<ICachedPage> bloomFilterPages = new ArrayList<ICachedPage>();
+ private final static long SEED = 0L;
+
+ public BloomFilter(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file, int[] keyFields)
+ throws HyracksDataException {
+ this.bufferCache = bufferCache;
+ this.fileMapProvider = fileMapProvider;
+ this.file = file;
+ this.keyFields = keyFields;
+ numBitsPerPage = bufferCache.getPageSize() * Byte.SIZE;
+ }
+
+ public int getFileId() {
+ return fileId;
+ }
+
+ public FileReference getFileReference() {
+ return file;
+ }
+
+ public int getNumPages() throws HyracksDataException {
+ if (!isActivated) {
+ throw new HyracksDataException("The bloom filter is not activated.");
+ }
+ return numPages;
+ }
+
+ public long getNumElements() throws HyracksDataException {
+ if (!isActivated) {
+ throw new HyracksDataException("The bloom filter is not activated.");
+ }
+ return numElements;
+ }
+
+ public boolean contains(ITupleReference tuple, long[] hashes) {
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
+ for (int i = 0; i < numHashes; ++i) {
+ long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
+
+ ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
+ int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
+ byte b = buffer.get(byteIndex);
+ int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
+
+ if (!((b & (1L << bitIndex)) != 0)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void prepareFile() throws HyracksDataException {
+ boolean fileIsMapped = false;
+ synchronized (fileMapProvider) {
+ fileIsMapped = fileMapProvider.isMapped(file);
+ if (!fileIsMapped) {
+ bufferCache.createFile(file);
+ }
+ fileId = fileMapProvider.lookupFileId(file);
+ try {
+ // Also creates the file if it doesn't exist yet.
+ bufferCache.openFile(fileId);
+ } catch (HyracksDataException e) {
+ // Revert state of buffer cache since file failed to open.
+ if (!fileIsMapped) {
+ bufferCache.deleteFile(fileId, false);
+ }
+ throw e;
+ }
+ }
+ }
+
+ public synchronized void create() throws HyracksDataException {
+ if (isActivated) {
+ throw new HyracksDataException("Failed to create the bloom filter since it is activated.");
+ }
+ prepareFile();
+ ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), true);
+ metaPage.acquireWriteLatch();
+ metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, 0);
+ metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, 0);
+ metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, 0L);
+ metaPage.getBuffer().putLong(NUM_BITS_OFFSET, 0L);
+ metaPage.releaseWriteLatch();
+ bufferCache.unpin(metaPage);
+ bufferCache.closeFile(fileId);
+ }
+
+ public synchronized void activate() throws HyracksDataException {
+ if (isActivated) {
+ return;
+ }
+
+ prepareFile();
+ readBloomFilterMetaData();
+
+ int currentPageId = 1;
+ while (currentPageId <= numPages) {
+ ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false);
+ bloomFilterPages.add(page);
+ ++currentPageId;
+ }
+ isActivated = true;
+ }
+
+ private void readBloomFilterMetaData() throws HyracksDataException {
+ ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
+ metaPage.acquireReadLatch();
+ numPages = metaPage.getBuffer().getInt(NUM_PAGES_OFFSET);
+ numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
+ numElements = metaPage.getBuffer().getLong(NUM_ELEMENTS_OFFSET);
+ numBits = metaPage.getBuffer().getLong(NUM_BITS_OFFSET);
+ metaPage.releaseReadLatch();
+ bufferCache.unpin(metaPage);
+ }
+
+ public synchronized void deactivate() throws HyracksDataException {
+ if (!isActivated) {
+ return;
+ }
+
+ for (int i = 0; i < numPages; ++i) {
+ bufferCache.unpin(bloomFilterPages.get(i));
+ }
+ bloomFilterPages.clear();
+ bufferCache.closeFile(fileId);
+ isActivated = false;
+ }
+
+ public synchronized void destroy() throws HyracksDataException {
+ if (isActivated) {
+ throw new HyracksDataException("Failed to destroy the bloom filter since it is activated.");
+ }
+
+ file.delete();
+ if (fileId == -1) {
+ return;
+ }
+ bufferCache.deleteFile(fileId, false);
+ fileId = -1;
+ }
+
+ public IIndexBulkLoader createBuilder(long numElements, int numHashes) throws HyracksDataException {
+ return new BloomFilterBuilder(numElements, numHashes);
+ }
+
+ public class BloomFilterBuilder implements IIndexBulkLoader {
+ private final long[] hashes = new long[2];
+
+ private final long numElements;
+ private final int numHashes;
+ private final long numBits;
+ private final int numPages;
+
+ public BloomFilterBuilder(long numElements, int numHashes) throws HyracksDataException {
+ if (!isActivated) {
+ throw new HyracksDataException("Failed to create the bloom filter builder since it is not activated.");
+ }
+ this.numElements = numElements;
+ this.numHashes = numHashes;
+ numBits = numElements * NUM_BITS_PER_ELEMENT;
+ long tmp = (long) Math.ceil(numBits / (double) numBitsPerPage);
+ if (tmp > Integer.MAX_VALUE) {
+ throw new HyracksDataException("Cannot create a bloom filter with his huge number of pages.");
+ }
+ numPages = (int) tmp;
+ persistBloomFilterMetaData();
+ readBloomFilterMetaData();
+
+ int currentPageId = 1;
+ while (currentPageId <= numPages) {
+ ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), true);
+ page.acquireWriteLatch();
+ bloomFilterPages.add(page);
+ ++currentPageId;
+ }
+ }
+
+ private void persistBloomFilterMetaData() throws HyracksDataException {
+ ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
+ metaPage.acquireWriteLatch();
+ metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, numPages);
+ metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
+ metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, numElements);
+ metaPage.getBuffer().putLong(NUM_BITS_OFFSET, numBits);
+ metaPage.releaseWriteLatch();
+ bufferCache.unpin(metaPage);
+ }
+
+ @Override
+ public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
+ for (int i = 0; i < numHashes; ++i) {
+ long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
+
+ ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
+ int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
+ byte b = buffer.get(byteIndex);
+ int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
+ b = (byte) (b | (1 << bitIndex));
+
+ buffer.put(byteIndex, b);
+ }
+ }
+
+ @Override
+ public void end() throws HyracksDataException, IndexException {
+ for (int i = 0; i < numPages; ++i) {
+ ICachedPage page = bloomFilterPages.get(i);
+ page.releaseWriteLatch();
+ }
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterFactory.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterFactory.java
new file mode 100644
index 0000000..8bd419a
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class BloomFilterFactory {
+ private final IBufferCache bufferCache;
+ private final IFileMapProvider fileMapProvider;
+ private final int[] keyFields;
+
+ public BloomFilterFactory(IBufferCache bufferCache, IFileMapProvider fileMapProvider, int[] keyFields) {
+ this.bufferCache = bufferCache;
+ this.fileMapProvider = fileMapProvider;
+ this.keyFields = keyFields;
+ }
+
+ public BloomFilter createBloomFiltertInstance(FileReference file) throws HyracksDataException {
+ return new BloomFilter(bufferCache, fileMapProvider, file, keyFields);
+ }
+}
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterSpecification.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterSpecification.java
new file mode 100644
index 0000000..c396f9b
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilterSpecification.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+public final class BloomFilterSpecification {
+ private final long numElements;
+ private final int numHashes;
+
+ public BloomFilterSpecification(long numElements, int numHashes) {
+ this.numElements = numElements;
+ this.numHashes = numHashes;
+ }
+
+ public long getNumElements() {
+ return numElements;
+ }
+
+ public int getNumHashes() {
+ return numHashes;
+ }
+}
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/MurmurHash128Bit.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/MurmurHash128Bit.java
new file mode 100644
index 0000000..b1ec77d
--- /dev/null
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/MurmurHash128Bit.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * The idea of this class is borrowed from http://murmurhash.googlepages.com/ and cassandra source code.3
+ * We changed the hash function to operate on ITupleReference instead of a byte array.
+ **/
+public class MurmurHash128Bit {
+
+ private final static int DUMMY_FIELD = 0;
+
+ public static long rotl64(long v, int n) {
+ return ((v << n) | (v >>> (64 - n)));
+ }
+
+ public static long fmix(long k) {
+ k ^= k >>> 33;
+ k *= 0xff51afd7ed558ccdL;
+ k ^= k >>> 33;
+ k *= 0xc4ceb9fe1a85ec53L;
+ k ^= k >>> 33;
+
+ return k;
+ }
+
+ public static void hash3_x64_128(ITupleReference tuple, int[] keyFields, long seed, long[] hashes) {
+ int length = 0;
+ for (int i = 0; i < keyFields.length; ++i) {
+ length += tuple.getFieldLength(keyFields[i]);
+ }
+ final int nblocks = length >> 4; // Process as 128-bit blocks.
+
+ long h1 = seed;
+ long h2 = seed;
+
+ long c1 = 0x87c37b91114253d5L;
+ long c2 = 0x4cf5ad432745937fL;
+
+ //----------
+ // body
+
+ int currentFieldIndex = 0;
+ int bytePos = 0;
+ for (int i = 0; i < nblocks; ++i) {
+
+ long k1 = 0L;
+ for (int j = 0; j < 8; ++j) {
+ k1 += (((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos] & 0xff) << (j << 3));
+ ++bytePos;
+ if (tuple.getFieldLength(keyFields[currentFieldIndex]) == bytePos) {
+ ++currentFieldIndex;
+ bytePos = 0;
+ }
+ }
+ long k2 = 0L;
+ for (int j = 0; j < 8; ++j) {
+ k2 += (((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos] & 0xff) << (j << 3));
+ ++bytePos;
+ if (tuple.getFieldLength(keyFields[currentFieldIndex]) == bytePos) {
+ ++currentFieldIndex;
+ bytePos = 0;
+ }
+ }
+
+ k1 *= c1;
+ k1 = rotl64(k1, 31);
+ k1 *= c2;
+ h1 ^= k1;
+
+ h1 = rotl64(h1, 27);
+ h1 += h2;
+ h1 = h1 * 5 + 0x52dce729;
+
+ k2 *= c2;
+ k2 = rotl64(k2, 33);
+ k2 *= c1;
+ h2 ^= k2;
+
+ h2 = rotl64(h2, 31);
+ h2 += h1;
+ h2 = h2 * 5 + 0x38495ab5;
+ }
+
+ //----------
+ // tail
+
+ long k1 = 0L;
+ long k2 = 0L;
+
+ currentFieldIndex = keyFields.length - 1;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ switch (length & 15) {
+ case 15:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 48;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 14:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 40;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 13:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 32;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 12:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 24;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 11:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 16;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 10:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 8;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 9:
+ k2 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]);
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ k2 *= c2;
+ k2 = rotl64(k2, 33);
+ k2 *= c1;
+ h2 ^= k2;
+
+ case 8:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 56;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 7:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 48;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 6:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 40;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 5:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 32;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 4:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 24;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 3:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 16;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 2:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]) << 8;
+ --bytePos;
+ if (bytePos == -1) {
+ --currentFieldIndex;
+ bytePos = tuple.getFieldLength(keyFields[currentFieldIndex]) - 1;
+ }
+ case 1:
+ k1 ^= ((long) tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex])
+ + bytePos]);
+ k1 *= c1;
+ k1 = rotl64(k1, 31);
+ k1 *= c2;
+ h1 ^= k1;
+ };
+
+ //----------
+ // finalization
+
+ h1 ^= length;
+ h2 ^= length;
+
+ h1 += h2;
+ h2 += h1;
+
+ h1 = fmix(h1);
+ h2 = fmix(h2);
+
+ h1 += h2;
+ h2 += h1;
+
+ hashes[0] = h1;
+ hashes[1] = h2;
+ }
+
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index d825b89..86bc32a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -910,7 +910,8 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
try {
return new BTreeBulkLoader(fillFactor, verifyInput);
} catch (HyracksDataException e) {
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java
index d6d74ee..1557c75 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java
@@ -22,69 +22,62 @@
* This interface describes the operations common to all indexes. Indexes
* implementing this interface can easily reuse existing index operators for
* dataflow. Users must perform operations on an via an {@link IIndexAccessor}.
- *
- * During dataflow, the lifecycle of IIndexes are handled through an
- * {@link IIndexLifecycleManager}.
+ * During dataflow, the lifecycle of IIndexes are handled through an {@link IIndexLifecycleManager}.
*/
public interface IIndex {
/**
- * Initializes the persistent state of an index.
- *
+ * Initializes the persistent state of an index.
* An index cannot be created if it is in the activated state.
* Calling create on an index that is deactivated has the effect of clearing the index.
*
- * @throws HyracksDataException
- * if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
- * creating files, or deleting files
- *
- * if the index is in the activated state
+ * @throws HyracksDataException
+ * if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
+ * creating files, or deleting files
+ * if the index is in the activated state
*/
public void create() throws HyracksDataException;
/**
- * Initializes the index's operational state. An index in the activated state may perform
+ * Initializes the index's operational state. An index in the activated state may perform
* operations via an {@link IIndexAccessor}.
*
* @throws HyracksDataException
- * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
- * creating files, or deleting files
+ * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
+ * creating files, or deleting files
*/
public void activate() throws HyracksDataException;
/**
- * Resets the operational state of the index. Calling clear has the same logical effect
- * as calling deactivate(), destroy(), create(), then activate(), but not necessarily the
+ * Resets the operational state of the index. Calling clear has the same logical effect
+ * as calling deactivate(), destroy(), create(), then activate(), but not necessarily the
* same physical effect.
*
* @throws HyracksDataException
- * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
- * creating files, or deleting files
- *
- * if the index is not in the activated state
+ * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
+ * creating files, or deleting files
+ * if the index is not in the activated state
*/
public void clear() throws HyracksDataException;
/**
- * Deinitializes the index's operational state. An index in the deactivated state may not
+ * Deinitializes the index's operational state. An index in the deactivated state may not
* perform operations.
*
* @throws HyracksDataException
- * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
- * creating files, or deleting files
+ * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
+ * creating files, or deleting files
*/
public void deactivate() throws HyracksDataException;
/**
- * Removes the persistent state of an index.
- *
+ * Removes the persistent state of an index.
* An index cannot be destroyed if it is in the activated state.
*
- * @throws HyracksDataException
- * if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
- * creating files, or deleting files
- *
- * if the index is already activated
+ * @throws HyracksDataException
+ * if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
+ * creating files, or deleting files
+ * if the index is already activated
*/
public void destroy() throws HyracksDataException;
@@ -94,8 +87,10 @@
* on the same {@link IIndex}.
*
* @returns IIndexAccessor an accessor for this {@link IIndex}
- * @param modificationCallback the callback to be used for modification operations
- * @param searchCallback the callback to be used for search operations
+ * @param modificationCallback
+ * the callback to be used for modification operations
+ * @param searchCallback
+ * the callback to be used for search operations
*/
public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback);
@@ -105,7 +100,7 @@
* An assertion error is thrown if validation fails.
*
* @throws HyracksDataException
- * if there is an error performing validation
+ * if there is an error performing validation
*/
public void validate() throws HyracksDataException;
@@ -124,5 +119,6 @@
* @param verifyInput
* @throws IndexException
*/
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException;
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws IndexException;
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index de4e627..1b6271d 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -33,6 +33,7 @@
private final IHyracksTaskContext ctx;
private final float fillFactor;
private final boolean verifyInput;
+ private final long numElementsHint;
private final IIndexDataflowHelper indexHelper;
private FrameTupleAccessor accessor;
private IIndex index;
@@ -41,12 +42,14 @@
private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- int[] fieldPermutation, float fillFactor, boolean verifyInput, IRecordDescriptorProvider recordDescProvider) {
+ int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+ IRecordDescriptorProvider recordDescProvider) {
this.opDesc = opDesc;
this.ctx = ctx;
this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
this.fillFactor = fillFactor;
this.verifyInput = verifyInput;
+ this.numElementsHint = numElementsHint;
this.recDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
}
@@ -58,7 +61,7 @@
indexHelper.open();
index = indexHelper.getIndexInstance();
try {
- bulkLoader = index.createBulkLoader(fillFactor, verifyInput);
+ bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint);
} catch (Exception e) {
indexHelper.close();
throw new HyracksDataException(e);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
index fea6463..c58dee5 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
@@ -35,12 +35,13 @@
private final int[] fieldPermutation;
private final float fillFactor;
private final boolean verifyInput;
+ private final long numElementsHint;
public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec,
IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation, float fillFactor,
- boolean verifyInput, IIndexDataflowHelperFactory dataflowHelperFactory,
+ boolean verifyInput, long numElementsHint, IIndexDataflowHelperFactory dataflowHelperFactory,
IModificationOperationCallbackFactory modificationOpCallbackFactory) {
super(spec, 1, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, dataflowHelperFactory, null, false, NoOpLocalResourceFactoryProvider.INSTANCE,
@@ -48,12 +49,13 @@
this.fieldPermutation = fieldPermutation;
this.fillFactor = fillFactor;
this.verifyInput = verifyInput;
+ this.numElementsHint = numElementsHint;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new IndexBulkLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor, verifyInput,
- recordDescProvider);
+ numElementsHint, recordDescProvider);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-btree/pom.xml b/hyracks-storage-am-lsm-btree/pom.xml
index 17f3714..afef819 100644
--- a/hyracks-storage-am-lsm-btree/pom.xml
+++ b/hyracks-storage-am-lsm-btree/pom.xml
@@ -33,6 +33,13 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-bloomfilter</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-common</artifactId>
<version>0.2.2-SNAPSHOT</version>
<type>jar</type>
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 4cd4974..b557125 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -23,8 +23,12 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -64,7 +68,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
@@ -86,13 +89,16 @@
private final ITreeIndexFrameFactory deleteLeafFrameFactory;
private final IBinaryComparatorFactory[] cmpFactories;
+ private final int numHashes = 10;
+
public LSMBTree(IInMemoryBufferCache memBufferCache, IInMemoryFreePageManager memFreePageManager,
ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMIndexFileManager fileManager,
TreeIndexFactory<BTree> diskBTreeFactory, TreeIndexFactory<BTree> bulkLoadBTreeFactory,
- IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
- ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
- ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
+ BloomFilterFactory bloomFilterFactory, IFileMapProvider diskFileMapProvider, int fieldCount,
+ IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy,
+ ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
+ ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
super(memFreePageManager, diskBTreeFactory.getBufferCache(), fileManager, diskFileMapProvider, mergePolicy,
opTrackerFactory, ioScheduler, ioOpCallbackProvider);
mutableComponent = new LSMBTreeMutableComponent(new BTree(memBufferCache,
@@ -102,8 +108,8 @@
this.insertLeafFrameFactory = insertLeafFrameFactory;
this.deleteLeafFrameFactory = deleteLeafFrameFactory;
this.cmpFactories = cmpFactories;
- componentFactory = new LSMBTreeImmutableComponentFactory(diskBTreeFactory);
- bulkLoadComponentFactory = new LSMBTreeImmutableComponentFactory(bulkLoadBTreeFactory);
+ componentFactory = new LSMBTreeImmutableComponentFactory(diskBTreeFactory, bloomFilterFactory);
+ bulkLoadComponentFactory = new LSMBTreeImmutableComponentFactory(bulkLoadBTreeFactory, bloomFilterFactory);
}
@Override
@@ -135,14 +141,15 @@
throw new HyracksDataException(e);
}
for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
- LSMBTreeImmutableComponent btree;
+ LSMBTreeImmutableComponent component;
try {
- btree = createDiskComponent(componentFactory, lsmComonentFileReference.getInsertIndexFileReference(),
- false);
+ component = createDiskComponent(componentFactory,
+ lsmComonentFileReference.getInsertIndexFileReference(),
+ lsmComonentFileReference.getBloomFilterFileReference(), 0L, 0, false);
} catch (IndexException e) {
throw new HyracksDataException(e);
}
- immutableComponents.add(btree);
+ immutableComponents.add(component);
}
isActivated = true;
}
@@ -167,8 +174,11 @@
List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
+ LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+ BTree btree = component.getBTree();
+ BloomFilter bloomFilter = component.getBloomFilter();
btree.deactivate();
+ bloomFilter.deactivate();
}
mutableComponent.getBTree().deactivate();
mutableComponent.getBTree().destroy();
@@ -189,8 +199,9 @@
List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
- btree.destroy();
+ LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+ component.getBTree().destroy();
+ component.getBloomFilter().destroy();
}
mutableComponent.getBTree().destroy();
fileManager.deleteDirs();
@@ -205,9 +216,11 @@
List<ILSMComponent> immutableComponents = componentsRef.get();
mutableComponent.getBTree().clear();
for (ILSMComponent c : immutableComponents) {
- BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
- btree.deactivate();
- btree.destroy();
+ LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
+ component.getBloomFilter().deactivate();
+ component.getBTree().deactivate();
+ component.getBloomFilter().destroy();
+ component.getBTree().destroy();
}
immutableComponents.clear();
}
@@ -346,28 +359,48 @@
opCtx.setOperation(IndexOperation.FLUSH);
opCtx.getComponentHolder().add(flushingComponent);
ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
- ioScheduler.scheduleOperation(new LSMFlushOperation(flushAccessor, flushingComponent, componentFileRefs
- .getInsertIndexFileReference(), callback));
+ ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent, componentFileRefs
+ .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback));
}
@Override
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
- LSMFlushOperation flushOp = (LSMFlushOperation) operation;
+ LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation;
LSMBTreeMutableComponent flushingComponent = (LSMBTreeMutableComponent) flushOp.getFlushingComponent();
IIndexAccessor accessor = flushingComponent.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- IIndexCursor scanCursor = accessor.createSearchCursor();
+
RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
+ IIndexCursor countingCursor = ((BTreeAccessor) accessor).createCountingSearchCursor();
+ accessor.search(countingCursor, nullPred);
+ long numElements = 0L;
+ try {
+ while (countingCursor.hasNext()) {
+ countingCursor.next();
+ ITupleReference countTuple = countingCursor.getTuple();
+ numElements = IntegerSerializerDeserializer.getInt(countTuple.getFieldData(0),
+ countTuple.getFieldStart(0));
+ }
+ } finally {
+ countingCursor.close();
+ }
+
+ LSMBTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getBTreeFlushTarget(),
+ flushOp.getBloomFilterFlushTarget(), numElements, numHashes, true);
+ IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false, numElements);
+ IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numElements, numHashes);
+
+ IIndexCursor scanCursor = accessor.createSearchCursor();
accessor.search(scanCursor, nullPred);
- LSMBTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getFlushTarget(), true);
- IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false);
try {
while (scanCursor.hasNext()) {
scanCursor.next();
+ builder.add(scanCursor.getTuple());
bulkLoader.add(scanCursor.getTuple());
}
} finally {
scanCursor.close();
+ builder.end();
}
bulkLoader.end();
return component;
@@ -392,7 +425,7 @@
.getName(), lastFile.getFile().getName());
ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
- .getInsertIndexFileReference(), callback));
+ .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
}
@Override
@@ -401,71 +434,103 @@
LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
ITreeIndexCursor cursor = mergeOp.getCursor();
mergedComponents.addAll(mergeOp.getMergingComponents());
- LSMBTreeImmutableComponent mergedBTree = createDiskComponent(componentFactory, mergeOp.getMergeTarget(), true);
- IIndexBulkLoader bulkLoader = mergedBTree.getBTree().createBulkLoader(1.0f, false);
+
+ long numElements = 0L;
+ for (int i = 0; i < mergedComponents.size(); ++i) {
+ numElements += ((LSMBTreeImmutableComponent) mergedComponents.get(i)).getBloomFilter().getNumElements();
+ }
+
+ LSMBTreeImmutableComponent mergedComponent = createDiskComponent(componentFactory,
+ mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), numElements, numHashes, true);
+
+ IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements);
+ IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements, numHashes);
try {
while (cursor.hasNext()) {
cursor.next();
ITupleReference frameTuple = cursor.getTuple();
+ builder.add(frameTuple);
bulkLoader.add(frameTuple);
}
} finally {
cursor.close();
+ builder.end();
}
bulkLoader.end();
- return mergedBTree;
+ return mergedComponent;
}
private LSMBTreeImmutableComponent createDiskComponent(LSMBTreeImmutableComponentFactory factory,
- FileReference fileRef, boolean createComponent) throws HyracksDataException, IndexException {
+ FileReference btreeFileRef, FileReference bloomFilterFileRef, long numElements, int numHashes,
+ boolean createComponent) throws HyracksDataException, IndexException {
// Create new BTree instance.
LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) factory
- .createLSMComponentInstance(new LSMComponentFileReferences(fileRef, null));
+ .createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
if (createComponent) {
component.getBTree().create();
+ component.getBloomFilter().create();
}
// BTree will be closed during cleanup of merge().
component.getBTree().activate();
+ component.getBloomFilter().activate();
return component;
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput) throws TreeIndexException {
- return new LSMBTreeBulkLoader(fillLevel, verifyInput);
+ public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
+ try {
+ return new LSMBTreeBulkLoader(fillLevel, verifyInput, numElementsHint);
+ } catch (HyracksDataException e) {
+ throw new TreeIndexException(e);
+ }
}
- private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
+ private ILSMComponent createBulkLoadTarget(long numElementsHint) throws HyracksDataException, IndexException {
LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
- return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(), true);
+ return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(),
+ componentFileRefs.getBloomFilterFileReference(), numElementsHint, numHashes, true);
}
@Override
public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
- BTree btree = ((LSMBTreeImmutableComponent) lsmComponent).getBTree();
- forceFlushDirtyPages(btree);
- markAsValidInternal(btree);
+ // The order of forcing the dirty page to be flushed is critical. The bloom filter must be always done first.
+ LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) lsmComponent;
+ int fileId = component.getBloomFilter().getFileId();
+ IBufferCache bufferCache = component.getBTree().getBufferCache();
+ int startPage = 0;
+ int maxPage = component.getBloomFilter().getNumPages();
+ forceFlushDirtyPages(bufferCache, fileId, startPage, maxPage);
+ forceFlushDirtyPages(component.getBTree());
+ markAsValidInternal(component.getBTree());
}
public class LSMBTreeBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final BTreeBulkLoader bulkLoader;
+ private final IIndexBulkLoader builder;
+ private boolean endCalledBasedOnFailure = false;
- public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+ public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException, HyracksDataException {
try {
- component = createBulkLoadTarget();
+ component = createBulkLoadTarget(numElementsHint);
} catch (HyracksDataException e) {
throw new TreeIndexException(e);
} catch (IndexException e) {
throw new TreeIndexException(e);
}
bulkLoader = (BTreeBulkLoader) ((LSMBTreeImmutableComponent) component).getBTree().createBulkLoader(
- fillFactor, verifyInput);
+ fillFactor, verifyInput, numElementsHint);
+ builder = ((LSMBTreeImmutableComponent) component).getBloomFilter().createBuilder(numElementsHint,
+ numHashes);
}
@Override
public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
try {
bulkLoader.add(tuple);
+ builder.add(tuple);
} catch (IndexException e) {
handleException();
throw e;
@@ -478,14 +543,21 @@
}
}
- protected void handleException() throws HyracksDataException {
+ protected void handleException() throws HyracksDataException, IndexException {
((LSMBTreeImmutableComponent) component).getBTree().deactivate();
+ if (!endCalledBasedOnFailure) {
+ builder.end();
+ }
+ ((LSMBTreeImmutableComponent) component).getBloomFilter().deactivate();
((LSMBTreeImmutableComponent) component).getBTree().destroy();
+ ((LSMBTreeImmutableComponent) component).getBloomFilter().destroy();
}
@Override
public void end() throws HyracksDataException, IndexException {
bulkLoader.end();
+ builder.end();
+ endCalledBasedOnFailure = true;
lsmHarness.addBulkLoadedComponent(component);
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
new file mode 100644
index 0000000..e1dbc9e
--- /dev/null
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class LSMBTreeFileManager extends AbstractLSMIndexFileManager {
+ private static final String BTREE_STRING = "b";
+ private static final String BLOOM_FILTER_STRING = "f";
+
+ private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
+
+ public LSMBTreeFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
+ TreeIndexFactory<? extends ITreeIndex> btreeFactory, int startIODeviceIndex) {
+ super(ioManager, fileMapProvider, file, null, startIODeviceIndex);
+ this.btreeFactory = btreeFactory;
+ }
+
+ protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
+ TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
+ throws HyracksDataException, IndexException {
+ File dir = new File(dev.getPath(), baseDir);
+ String[] files = dir.list(filter);
+ for (String fileName : files) {
+ File file = new File(dir.getPath() + File.separator + fileName);
+ FileReference fileRef = new FileReference(file);
+ if (treeFactory == null || isValidTreeIndex(treeFactory.createIndexInstance(fileRef))) {
+ allFiles.add(new ComparableFileName(fileRef));
+ } else {
+ file.delete();
+ }
+ }
+ }
+
+ @Override
+ public LSMComponentFileReferences getRelFlushFileReference() {
+ Date date = new Date();
+ String ts = formatter.format(date);
+ String baseName = baseDir + ts + SPLIT_STRING + ts;
+ // Begin timestamp and end timestamp are identical since it is a flush
+ return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + BTREE_STRING), null,
+ createFlushFile(baseName + SPLIT_STRING + BLOOM_FILTER_STRING));
+ }
+
+ @Override
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
+ throws HyracksDataException {
+ String[] firstTimestampRange = firstFileName.split(SPLIT_STRING);
+ String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
+
+ String baseName = baseDir + firstTimestampRange[0] + SPLIT_STRING + lastTimestampRange[1];
+ // Get the range of timestamps by taking the earliest and the latest timestamps
+ return new LSMComponentFileReferences(createMergeFile(baseName + SPLIT_STRING + BTREE_STRING), null,
+ createMergeFile(baseName + SPLIT_STRING + BLOOM_FILTER_STRING));
+ }
+
+ private static FilenameFilter btreeFilter = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return !name.startsWith(".") && name.endsWith(BTREE_STRING);
+ }
+ };
+
+ private static FilenameFilter bloomFilterFilter = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return !name.startsWith(".") && name.endsWith(BLOOM_FILTER_STRING);
+ }
+ };
+
+ @Override
+ public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException, IndexException {
+ List<LSMComponentFileReferences> validFiles = new ArrayList<LSMComponentFileReferences>();
+ ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<ComparableFileName>();
+ ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<ComparableFileName>();
+
+ // Gather files from all IODeviceHandles.
+ for (IODeviceHandle dev : ioManager.getIODevices()) {
+ cleanupAndGetValidFilesInternal(dev, bloomFilterFilter, null, allBloomFilterFiles);
+ HashSet<String> bloomFilterFilesSet = new HashSet<String>();
+ for (ComparableFileName cmpFileName : allBloomFilterFiles) {
+ int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+ bloomFilterFilesSet.add(cmpFileName.fileName.substring(0, index));
+ }
+ // List of valid BTree files that may or may not have a bloom filter buddy. Will check for buddies below.
+ ArrayList<ComparableFileName> tmpAllBTreeFiles = new ArrayList<ComparableFileName>();
+ cleanupAndGetValidFilesInternal(dev, btreeFilter, btreeFactory, tmpAllBTreeFiles);
+ // Look for buddy bloom filters for all valid BTrees.
+ // If no buddy is found, delete the file, otherwise add the BTree to allBTreeFiles.
+ for (ComparableFileName cmpFileName : tmpAllBTreeFiles) {
+ int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+ String file = cmpFileName.fileName.substring(0, index);
+ if (bloomFilterFilesSet.contains(file)) {
+ allBTreeFiles.add(cmpFileName);
+ } else {
+ // Couldn't find the corresponding bloom filter file; thus, delete
+ // the BTree file.
+ File invalidBTreeFile = new File(cmpFileName.fullPath);
+ invalidBTreeFile.delete();
+ }
+ }
+ }
+ // Sanity check.
+ if (allBTreeFiles.size() != allBloomFilterFiles.size()) {
+ throw new HyracksDataException(
+ "Unequal number of valid BTree and bloom filter files found. Aborting cleanup.");
+ }
+
+ // Trivial cases.
+ if (allBTreeFiles.isEmpty() || allBloomFilterFiles.isEmpty()) {
+ return validFiles;
+ }
+
+ if (allBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) {
+ validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef, null, allBloomFilterFiles
+ .get(0).fileRef));
+ return validFiles;
+ }
+
+ // Sorts files names from earliest to latest timestamp.
+ Collections.sort(allBTreeFiles);
+ Collections.sort(allBloomFilterFiles);
+
+ List<ComparableFileName> validComparableBTreeFiles = new ArrayList<ComparableFileName>();
+ ComparableFileName lastBTree = allBTreeFiles.get(0);
+ validComparableBTreeFiles.add(lastBTree);
+
+ List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<ComparableFileName>();
+ ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+ validComparableBloomFilterFiles.add(lastBloomFilter);
+
+ for (int i = 1; i < allBTreeFiles.size(); i++) {
+ ComparableFileName currentBTree = allBTreeFiles.get(i);
+ ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
+ // Current start timestamp is greater than last stop timestamp.
+ if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
+ && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+ validComparableBTreeFiles.add(currentBTree);
+ validComparableBloomFilterFiles.add(currentBloomFilter);
+ lastBTree = currentBTree;
+ lastBloomFilter = currentBloomFilter;
+ } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
+ && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
+ && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
+ && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
+ // Invalid files are completely contained in last interval.
+ File invalidBTreeFile = new File(currentBTree.fullPath);
+ invalidBTreeFile.delete();
+ File invalidBloomFilterFile = new File(currentBloomFilter.fullPath);
+ invalidBloomFilterFile.delete();
+ } else {
+ // This scenario should not be possible.
+ throw new HyracksDataException("Found LSM files with overlapping but not contained timetamp intervals.");
+ }
+ }
+
+ // Sort valid files in reverse lexicographical order, such that newer
+ // files come first.
+ Collections.sort(validComparableBTreeFiles, recencyCmp);
+ Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+
+ Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
+ Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+ while (btreeFileIter.hasNext() && bloomFilterFileIter.hasNext()) {
+ ComparableFileName cmpBTreeFileName = btreeFileIter.next();
+ ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
+ validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, null,
+ cmpBloomFilterFileName.fileRef));
+ }
+
+ return validFiles;
+ }
+}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
new file mode 100644
index 0000000..dfda07b
--- /dev/null
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -0,0 +1,71 @@
+package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
+
+public class LSMBTreeFlushOperation implements ILSMIOOperation {
+
+ private final ILSMIndexAccessorInternal accessor;
+ private final ILSMComponent flushingComponent;
+ private final FileReference btreeFlushTarget;
+ private final FileReference bloomFilterFlushTarget;
+ private final ILSMIOOperationCallback callback;
+
+ public LSMBTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
+ FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+ this.accessor = accessor;
+ this.flushingComponent = flushingComponent;
+ this.btreeFlushTarget = btreeFlushTarget;
+ this.bloomFilterFlushTarget = bloomFilterFlushTarget;
+ this.callback = callback;
+ }
+
+ @Override
+ public Set<IODeviceHandle> getReadDevices() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<IODeviceHandle> getWriteDevices() {
+ Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
+ devs.add(btreeFlushTarget.getDeviceHandle());
+ devs.add(bloomFilterFlushTarget.getDeviceHandle());
+ return devs;
+ }
+
+ @Override
+ public void perform() throws HyracksDataException, IndexException {
+ accessor.flush(this);
+ }
+
+ @Override
+ public ILSMIOOperationCallback getCallback() {
+ return callback;
+ }
+
+ public FileReference getBTreeFlushTarget() {
+ return btreeFlushTarget;
+ }
+
+ public FileReference getBloomFilterFlushTarget() {
+ return bloomFilterFlushTarget;
+ }
+
+ public ILSMIndexAccessorInternal getAccessor() {
+ return accessor;
+ }
+
+ public ILSMComponent getFlushingComponent() {
+ return flushingComponent;
+ }
+}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
index 2251a49..7b5e95d 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
@@ -1,24 +1,33 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractImmutableLSMComponent;
public class LSMBTreeImmutableComponent extends AbstractImmutableLSMComponent {
private final BTree btree;
+ private final BloomFilter bloomFilter;
- public LSMBTreeImmutableComponent(BTree btree) {
+ public LSMBTreeImmutableComponent(BTree btree, BloomFilter bloomFilter) {
this.btree = btree;
+ this.bloomFilter = bloomFilter;
}
@Override
public void destroy() throws HyracksDataException {
btree.deactivate();
btree.destroy();
+ bloomFilter.deactivate();
+ bloomFilter.destroy();
}
public BTree getBTree() {
return btree;
}
+
+ public BloomFilter getBloomFilter() {
+ return bloomFilter;
+ }
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
index 696fc2c..998072f 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -25,14 +27,18 @@
public class LSMBTreeImmutableComponentFactory implements ILSMComponentFactory {
private final TreeIndexFactory<BTree> btreeFactory;
+ private final BloomFilterFactory bloomFilterFactory;
- public LSMBTreeImmutableComponentFactory(TreeIndexFactory<BTree> btreeFactory) {
+ public LSMBTreeImmutableComponentFactory(TreeIndexFactory<BTree> btreeFactory, BloomFilterFactory bloomFilterFactory) {
this.btreeFactory = btreeFactory;
+ this.bloomFilterFactory = bloomFilterFactory;
}
@Override
- public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException {
- return new LSMBTreeImmutableComponent(btreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()));
+ public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException,
+ HyracksDataException {
+ return new LSMBTreeImmutableComponent(btreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()),
+ bloomFilterFactory.createBloomFiltertInstance(cfr.getBloomFilterFileReference()));
}
@Override
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index a3a7097..ddfe365 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -35,15 +34,18 @@
private final ILSMIndexAccessorInternal accessor;
private final List<ILSMComponent> mergingComponents;
private final ITreeIndexCursor cursor;
- private final FileReference mergeTarget;
+ private final FileReference btreeMergeTarget;
+ private final FileReference bloomFilterMergeTarget;
private final ILSMIOOperationCallback callback;
public LSMBTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
- ITreeIndexCursor cursor, FileReference mergeTarget, ILSMIOOperationCallback callback) {
+ ITreeIndexCursor cursor, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget,
+ ILSMIOOperationCallback callback) {
this.accessor = accessor;
this.mergingComponents = mergingComponents;
this.cursor = cursor;
- this.mergeTarget = mergeTarget;
+ this.btreeMergeTarget = btreeMergeTarget;
+ this.bloomFilterMergeTarget = bloomFilterMergeTarget;
this.callback = callback;
}
@@ -59,7 +61,10 @@
@Override
public Set<IODeviceHandle> getWriteDevices() {
- return Collections.singleton(mergeTarget.getDeviceHandle());
+ Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
+ devs.add(btreeMergeTarget.getDeviceHandle());
+ devs.add(bloomFilterMergeTarget.getDeviceHandle());
+ return devs;
}
@Override
@@ -72,8 +77,12 @@
return callback;
}
- public FileReference getMergeTarget() {
- return mergeTarget;
+ public FileReference getBTreeMergeTarget() {
+ return btreeMergeTarget;
+ }
+
+ public FileReference getBloomFilterMergeTarget() {
+ return bloomFilterMergeTarget;
}
public ITreeIndexCursor getCursor() {
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
index 3706230..154ecf6 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
@@ -29,6 +30,7 @@
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.freepage.LinkedListFreePageManagerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeCopyTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
@@ -38,7 +40,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -80,13 +81,20 @@
typeTraits.length);
TreeIndexFactory<BTree> bulkLoadBTreeFactory = new BTreeFactory(diskBufferCache, diskFileMapProvider,
freePageManagerFactory, interiorFrameFactory, insertLeafFrameFactory, cmpFactories, typeTraits.length);
- ILSMIndexFileManager fileNameManager = new LSMIndexFileManager(ioManager, diskFileMapProvider, file,
+
+ int keyFields[] = new int[cmpFactories.length];
+ for (int i = 0; i < cmpFactories.length; i++) {
+ keyFields[i] = i;
+ }
+ BloomFilterFactory bloomFilterFactory = new BloomFilterFactory(diskBufferCache, diskFileMapProvider, keyFields);
+
+ ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, diskFileMapProvider, file,
diskBTreeFactory, startIODeviceIndex);
LSMBTree lsmTree = new LSMBTree(memBufferCache, memFreePageManager, interiorFrameFactory,
insertLeafFrameFactory, deleteLeafFrameFactory, fileNameManager, diskBTreeFactory,
- bulkLoadBTreeFactory, diskFileMapProvider, typeTraits.length, cmpFactories, mergePolicy,
- opTrackerFactory, ioScheduler, ioOpCallbackProvider);
+ bulkLoadBTreeFactory, bloomFilterFactory, diskFileMapProvider, typeTraits.length, cmpFactories,
+ mergePolicy, opTrackerFactory, ioScheduler, ioOpCallbackProvider);
return lsmTree;
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java
index d00b805..1f3a2b7 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java
@@ -1,11 +1,13 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
public interface ILSMComponentFactory {
- public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException;
+ public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException,
+ HyracksDataException;
public IBufferCache getBufferCache();
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 3d2ebcd..e2bd77c 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -48,7 +48,6 @@
protected final IFileMapProvider fileMapProvider;
// baseDir should reflect dataset name and partition name.
- protected FileReference file;
protected String baseDir;
protected final Format formatter = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
protected final Comparator<String> cmp = new FileNameComparator();
@@ -61,7 +60,6 @@
public AbstractLSMIndexFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
TreeIndexFactory<? extends ITreeIndex> treeFactory, int startIODeviceIndex) {
- this.file = file;
this.baseDir = file.getFile().getPath();
if (!baseDir.endsWith(System.getProperty("file.separator"))) {
baseDir += System.getProperty("file.separator");
@@ -100,9 +98,21 @@
}
}
- abstract protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
+ protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
- throws HyracksDataException, IndexException;
+ throws HyracksDataException, IndexException {
+ File dir = new File(dev.getPath(), baseDir);
+ String[] files = dir.list(filter);
+ for (String fileName : files) {
+ File file = new File(dir.getPath() + File.separator + fileName);
+ FileReference fileRef = new FileReference(file);
+ if (isValidTreeIndex(treeFactory.createIndexInstance(fileRef))) {
+ allFiles.add(new ComparableFileName(fileRef));
+ } else {
+ file.delete();
+ }
+ }
+ }
@Override
public void createDirs() {
@@ -145,7 +155,7 @@
Date date = new Date();
String ts = formatter.format(date);
// Begin timestamp and end timestamp are identical since it is a flush
- return new LSMComponentFileReferences(createFlushFile(baseDir + ts + SPLIT_STRING + ts), null);
+ return new LSMComponentFileReferences(createFlushFile(baseDir + ts + SPLIT_STRING + ts), null, null);
}
@Override
@@ -155,7 +165,7 @@
String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
// Get the range of timestamps by taking the earliest and the latest timestamps
return new LSMComponentFileReferences(createMergeFile(baseDir + firstTimestampRange[0] + SPLIT_STRING
- + lastTimestampRange[1]), null);
+ + lastTimestampRange[1]), null, null);
}
@Override
@@ -177,7 +187,7 @@
}
if (allFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null));
+ validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null, null));
return validFiles;
}
@@ -209,7 +219,7 @@
// Sort valid files in reverse lexicographical order, such that newer files come first.
Collections.sort(validComparableFiles, recencyCmp);
for (ComparableFileName cmpFileName : validComparableFiles) {
- validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null));
+ validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null, null));
}
return validFiles;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
index ac6ddcf..019dca4 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
@@ -24,9 +24,14 @@
// This FileReference for the delete index (if any). For example, this will be the the FileReference of the buddy BTree in one component of the LSM-RTree.
private final FileReference deleteIndexFileReference;
- public LSMComponentFileReferences(FileReference insertIndexFileReference, FileReference deleteIndexFileReference) {
+ // This FileReference for the bloom filter (if any).
+ private final FileReference bloomFilterFileReference;
+
+ public LSMComponentFileReferences(FileReference insertIndexFileReference, FileReference deleteIndexFileReference,
+ FileReference bloomFilterFileReference) {
this.insertIndexFileReference = insertIndexFileReference;
this.deleteIndexFileReference = deleteIndexFileReference;
+ this.bloomFilterFileReference = bloomFilterFileReference;
}
public FileReference getInsertIndexFileReference() {
@@ -36,4 +41,8 @@
public FileReference getDeleteIndexFileReference() {
return deleteIndexFileReference;
}
+
+ public FileReference getBloomFilterFileReference() {
+ return bloomFilterFileReference;
+ }
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
deleted file mode 100644
index 6ce1f08..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.util.Collections;
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
-
-public class LSMFlushOperation implements ILSMIOOperation {
-
- private final ILSMIndexAccessorInternal accessor;
- private final ILSMComponent flushingComponent;
- private final FileReference flushTarget;
- private final ILSMIOOperationCallback callback;
-
- public LSMFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
- FileReference flushTarget, ILSMIOOperationCallback callback) {
- this.accessor = accessor;
- this.flushingComponent = flushingComponent;
- this.flushTarget = flushTarget;
- this.callback = callback;
- }
-
- @Override
- public Set<IODeviceHandle> getReadDevices() {
- return Collections.emptySet();
- }
-
- @Override
- public Set<IODeviceHandle> getWriteDevices() {
- return Collections.singleton(flushTarget.getDeviceHandle());
- }
-
- @Override
- public void perform() throws HyracksDataException, IndexException {
- accessor.flush(this);
- }
-
- @Override
- public ILSMIOOperationCallback getCallback() {
- return callback;
- }
-
- public FileReference getFlushTarget() {
- return flushTarget;
- }
-
- public ILSMIndexAccessorInternal getAccessor() {
- return accessor;
- }
-
- public ILSMComponent getFlushingComponent() {
- return flushingComponent;
- }
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexFileManager.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexFileManager.java
deleted file mode 100644
index a7a16ac..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexFileManager.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.ArrayList;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-
-public class LSMIndexFileManager extends AbstractLSMIndexFileManager {
-
- public LSMIndexFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
- TreeIndexFactory<? extends ITreeIndex> treeFactory, int startIODeviceIndex) {
- super(ioManager, fileMapProvider, file, treeFactory, startIODeviceIndex);
- }
-
- protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
- TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
- throws HyracksDataException, IndexException {
- File dir = new File(dev.getPath(), baseDir);
- String[] files = dir.list(filter);
- for (String fileName : files) {
- File file = new File(dir.getPath() + File.separator + fileName);
- FileReference fileRef = new FileReference(file);
- if (isValidTreeIndex(treeFactory.createIndexInstance(fileRef))) {
- allFiles.add(new ComparableFileName(fileRef));
- } else {
- file.delete();
- }
- }
- }
-}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
index 6b07608..da3cad5 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
@@ -37,13 +37,14 @@
private final int[] fieldPermutation;
private final boolean verifyInput;
+ private final long numElementsHint;
public LSMInvertedIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] fieldPermutation,
- boolean verifyInput, IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
- IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
- IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
- IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
- IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory,
+ boolean verifyInput, long numElementsHint, IStorageManagerInterface storageManager,
+ IFileSplitProvider fileSplitProvider, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenComparatorFactories,
+ ITypeTraits[] invListsTypeTraits, IBinaryComparatorFactory[] invListComparatorFactories,
+ IBinaryTokenizerFactory tokenizerFactory, IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory,
IModificationOperationCallbackFactory modificationOpCallbackFactory) {
super(spec, 1, 0, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
@@ -51,12 +52,13 @@
NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
this.fieldPermutation = fieldPermutation;
this.verifyInput = verifyInput;
+ this.numElementsHint = numElementsHint;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new IndexBulkLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, 1.0f, verifyInput,
- recordDescProvider);
+ numElementsHint, recordDescProvider);
}
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index da6bc28..159352f 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -414,7 +414,7 @@
memBTreeAccessor.search(scanCursor, nullPred);
// Bulk load the disk inverted index from the in-memory inverted index.
- IIndexBulkLoader invIndexBulkLoader = diskInvertedIndex.createBulkLoader(1.0f, false);
+ IIndexBulkLoader invIndexBulkLoader = diskInvertedIndex.createBulkLoader(1.0f, false, 0L);
try {
while (scanCursor.hasNext()) {
scanCursor.next();
@@ -435,7 +435,7 @@
deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred);
// Bulk load the deleted-keys BTree.
- IIndexBulkLoader deletedKeysBTreeBulkLoader = diskDeletedKeysBTree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader deletedKeysBTreeBulkLoader = diskDeletedKeysBTree.createBulkLoader(1.0f, false, 0L);
try {
while (deletedKeysScanCursor.hasNext()) {
deletedKeysScanCursor.next();
@@ -490,7 +490,7 @@
IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
IIndexCursor cursor = mergeOp.getCursor();
- IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true);
+ IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L);
try {
while (cursor.hasNext()) {
cursor.next();
@@ -518,15 +518,17 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
- return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput);
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws IndexException {
+ return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint);
}
public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final IIndexBulkLoader invIndexBulkLoader;
- public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+ public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws IndexException {
// Note that by using a flush target file name, we state that the
// new bulk loaded tree is "newer" than any other merged tree.
try {
@@ -537,7 +539,7 @@
throw new TreeIndexException(e);
}
invIndexBulkLoader = ((LSMInvertedIndexImmutableComponent) component).getInvIndex().createBulkLoader(
- fillFactor, verifyInput);
+ fillFactor, verifyInput, numElementsHint);
}
@Override
@@ -580,7 +582,7 @@
FileReference dictBTreeFileRef, FileReference btreeFileRef, boolean create) throws HyracksDataException,
IndexException {
LSMInvertedIndexImmutableComponent component = (LSMInvertedIndexImmutableComponent) factory
- .createLSMComponentInstance(new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef));
+ .createLSMComponentInstance(new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef, null));
if (create) {
component.getInvIndex().create();
component.getDeletedKeysBTree().create();
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 8ffb0bd..21a11dc 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -29,14 +29,14 @@
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
// TODO: Refactor for better code sharing with other file managers.
-public class LSMInvertedIndexFileManager extends LSMIndexFileManager implements IInvertedIndexFileNameMapper {
+public class LSMInvertedIndexFileManager extends AbstractLSMIndexFileManager implements IInvertedIndexFileNameMapper {
private static final String DICT_BTREE_SUFFIX = "b";
private static final String INVLISTS_SUFFIX = "i";
private static final String DELETED_KEYS_BTREE_SUFFIX = "d";
@@ -69,7 +69,7 @@
String baseName = baseDir + ts + SPLIT_STRING + ts;
// Begin timestamp and end timestamp are identical since it is a flush
return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + DICT_BTREE_SUFFIX),
- createFlushFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX));
+ createFlushFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX), null);
}
@Override
@@ -81,7 +81,7 @@
String baseName = baseDir + firstTimestampRange[0] + SPLIT_STRING + lastTimestampRange[1];
// Get the range of timestamps by taking the earliest and the latest timestamps
return new LSMComponentFileReferences(createMergeFile(baseName + SPLIT_STRING + DICT_BTREE_SUFFIX),
- createMergeFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX));
+ createMergeFile(baseName + SPLIT_STRING + DELETED_KEYS_BTREE_SUFFIX), null);
}
@Override
@@ -132,7 +132,7 @@
if (allDictBTreeFiles.size() == 1 && allDeletedKeysBTreeFiles.size() == 1) {
validFiles.add(new LSMComponentFileReferences(allDictBTreeFiles.get(0).fileRef, allDeletedKeysBTreeFiles
- .get(0).fileRef));
+ .get(0).fileRef, null));
return validFiles;
}
@@ -183,7 +183,8 @@
while (dictBTreeFileIter.hasNext() && deletedKeysBTreeIter.hasNext()) {
ComparableFileName cmpDictBTreeFile = dictBTreeFileIter.next();
ComparableFileName cmpDeletedKeysBTreeFile = deletedKeysBTreeIter.next();
- validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.fileRef, cmpDeletedKeysBTreeFile.fileRef));
+ validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.fileRef, cmpDeletedKeysBTreeFile.fileRef,
+ null));
}
return validFiles;
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index 668250c..d5a074e 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -191,7 +191,8 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws IndexException {
throw new UnsupportedOperationException("Bulk load not supported by in-memory inverted index.");
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index f1552eb..afeaf90 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -76,7 +76,7 @@
protected final int invListEndPageIdField;
protected final int invListStartOffField;
protected final int invListNumElementsField;
-
+
// Type traits to be appended to the token type trait which finally form the BTree field type traits.
protected static final ITypeTraits[] btreeValueTypeTraits = new ITypeTraits[4];
static {
@@ -283,7 +283,7 @@
btreeTuple.getFieldStart(invListNumElementsField));
listCursor.reset(startPageId, endPageId, startOff, numElements);
}
-
+
public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
private final ArrayTupleBuilder btreeTupleBuilder;
private final ArrayTupleReference btreeTupleReference;
@@ -302,8 +302,8 @@
private final boolean verifyInput;
private final MultiComparator allCmp;
- public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean verifyInput, int startPageId, int fileId)
- throws IndexException, HyracksDataException {
+ public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean verifyInput, long numElementsHint,
+ int startPageId, int fileId) throws IndexException, HyracksDataException {
this.verifyInput = verifyInput;
this.tokenCmp = MultiComparator.create(btree.getComparatorFactories());
this.invListCmp = MultiComparator.create(invListCmpFactories);
@@ -316,7 +316,7 @@
this.btreeTupleReference = new ArrayTupleReference();
this.lastTupleBuilder = new ArrayTupleBuilder(numTokenFields + numInvListKeys);
this.lastTuple = new ArrayTupleReference();
- this.btreeBulkloader = btree.createBulkLoader(btreeFillFactor, verifyInput);
+ this.btreeBulkloader = btree.createBulkLoader(btreeFillFactor, verifyInput, numElementsHint);
currentPageId = startPageId;
currentPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), true);
currentPage.acquireWriteLatch();
@@ -477,7 +477,7 @@
this.index = index;
this.searcher = searcher;
}
-
+
@Override
public IIndexCursor createSearchCursor() {
return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
@@ -563,9 +563,10 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws IndexException {
try {
- return new OnDiskInvertedIndexBulkLoader(fillFactor, verifyInput, rootPageId, fileId);
+ return new OnDiskInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint, rootPageId, fileId);
} catch (HyracksDataException e) {
throw new InvertedIndexException(e);
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 5a1482f..0e85cfd 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -229,7 +229,7 @@
FileReference deleteFileRef, boolean createComponent) throws HyracksDataException, IndexException {
// Create new tree instance.
LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) factory
- .createLSMComponentInstance(new LSMComponentFileReferences(insertFileRef, deleteFileRef));
+ .createLSMComponentInstance(new LSMComponentFileReferences(insertFileRef, deleteFileRef, null));
if (createComponent) {
component.getRTree().create();
if (component.getBTree() != null) {
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index e43e525..ea6548e 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -207,7 +207,7 @@
@Override
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation;
- LSMRTreeMutableComponent flushingComponent = flushOp.getFlushingComponent();
+ LSMRTreeMutableComponent flushingComponent = (LSMRTreeMutableComponent) flushOp.getFlushingComponent();
// Renaming order is critical because we use assume ordering when we
// read the file names when we open the tree.
// The RTree should be renamed before the BTree.
@@ -251,7 +251,7 @@
if (!isEmpty) {
rTreeTupleSorter.sort();
}
- rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false);
+ rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L);
cursor = rTreeTupleSorter;
try {
@@ -274,7 +274,7 @@
BTree diskBTree = component.getBTree();
// BulkLoad the tuples from the in-memory tree into the new disk BTree.
- IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false, 0L);
try {
while (btreeScanCursor.hasNext()) {
btreeScanCursor.next();
@@ -330,7 +330,7 @@
RTree mergedRTree = component.getRTree();
BTree mergedBTree = component.getBTree();
- IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L);
try {
while (cursor.hasNext()) {
cursor.next();
@@ -343,7 +343,7 @@
bulkloader.end();
// Load an empty BTree tree.
- mergedBTree.createBulkLoader(1.0f, false).end();
+ mergedBTree.createBulkLoader(1.0f, false, 0L).end();
return new LSMRTreeImmutableComponent(mergedRTree, mergedBTree);
}
@@ -377,15 +377,17 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput) throws TreeIndexException {
- return new LSMRTreeBulkLoader(fillLevel, verifyInput);
+ public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
+ return new LSMRTreeBulkLoader(fillLevel, verifyInput, numElementsHint);
}
public class LSMRTreeBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final IIndexBulkLoader bulkLoader;
- public LSMRTreeBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+ public LSMRTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
// Note that by using a flush target file name, we state that the
// new bulk loaded tree is "newer" than any other merged tree.
try {
@@ -395,7 +397,8 @@
} catch (IndexException e) {
throw new TreeIndexException(e);
}
- bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput);
+ bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
+ numElementsHint);
}
@Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index 7ceecad..041fda1 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -30,12 +30,12 @@
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-public class LSMRTreeFileManager extends LSMIndexFileManager {
+public class LSMRTreeFileManager extends AbstractLSMIndexFileManager {
private static final String RTREE_STRING = "r";
private static final String BTREE_STRING = "b";
@@ -69,7 +69,7 @@
String baseName = baseDir + ts + SPLIT_STRING + ts;
// Begin timestamp and end timestamp are identical since it is a flush
return new LSMComponentFileReferences(createFlushFile(baseName + SPLIT_STRING + RTREE_STRING),
- createFlushFile(baseName + SPLIT_STRING + BTREE_STRING));
+ createFlushFile(baseName + SPLIT_STRING + BTREE_STRING), null);
}
@Override
@@ -81,7 +81,7 @@
String baseName = baseDir + firstTimestampRange[0] + SPLIT_STRING + lastTimestampRange[1];
// Get the range of timestamps by taking the earliest and the latest timestamps
return new LSMComponentFileReferences(createMergeFile(baseName + SPLIT_STRING + RTREE_STRING),
- createMergeFile(baseName + SPLIT_STRING + BTREE_STRING));
+ createMergeFile(baseName + SPLIT_STRING + BTREE_STRING), null);
}
@Override
@@ -127,7 +127,8 @@
}
if (allRTreeFiles.size() == 1 && allBTreeFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).fileRef, allBTreeFiles.get(0).fileRef));
+ validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).fileRef, allBTreeFiles.get(0).fileRef,
+ null));
return validFiles;
}
@@ -178,7 +179,7 @@
while (rtreeFileIter.hasNext() && btreeFileIter.hasNext()) {
ComparableFileName cmpRTreeFileName = rtreeFileIter.next();
ComparableFileName cmpBTreeFileName = btreeFileIter.next();
- validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.fileRef, cmpBTreeFileName.fileRef));
+ validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.fileRef, cmpBTreeFileName.fileRef, null));
}
return validFiles;
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index 8698a1d..8c5c6fe 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -8,6 +8,7 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
@@ -15,12 +16,12 @@
public class LSMRTreeFlushOperation implements ILSMIOOperation {
private final ILSMIndexAccessorInternal accessor;
- private final LSMRTreeMutableComponent flushingComponent;
+ private final ILSMComponent flushingComponent;
private final FileReference rtreeFlushTarget;
private final FileReference btreeFlushTarget;
private final ILSMIOOperationCallback callback;
- public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, LSMRTreeMutableComponent flushingComponent,
+ public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
FileReference rtreeFlushTarget, FileReference btreeFlushTarget, ILSMIOOperationCallback callback) {
this.accessor = accessor;
this.flushingComponent = flushingComponent;
@@ -38,7 +39,9 @@
public Set<IODeviceHandle> getWriteDevices() {
Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
devs.add(rtreeFlushTarget.getDeviceHandle());
- devs.add(btreeFlushTarget.getDeviceHandle());
+ if (btreeFlushTarget != null) {
+ devs.add(btreeFlushTarget.getDeviceHandle());
+ }
return devs;
}
@@ -60,7 +63,7 @@
return btreeFlushTarget;
}
- public LSMRTreeMutableComponent getFlushingComponent() {
+ public ILSMComponent getFlushingComponent() {
return flushingComponent;
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index 970b253..d6a8693 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -1,6 +1,5 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -40,14 +39,21 @@
for (ILSMComponent o : mergingComponents) {
LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) o;
devs.add(component.getRTree().getFileReference().getDeviceHandle());
- devs.add(component.getBTree().getFileReference().getDeviceHandle());
+ if (component.getBTree() != null) {
+ devs.add(component.getBTree().getFileReference().getDeviceHandle());
+ }
}
return devs;
}
@Override
public Set<IODeviceHandle> getWriteDevices() {
- return Collections.singleton(rtreeMergeTarget.getDeviceHandle());
+ Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
+ devs.add(rtreeMergeTarget.getDeviceHandle());
+ if (btreeMergeTarget != null) {
+ devs.add(btreeMergeTarget.getDeviceHandle());
+ }
+ return devs;
}
@Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 824b8ef..ea3fc46 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -52,7 +52,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
@@ -205,13 +204,13 @@
opCtx.setOperation(IndexOperation.FLUSH);
opCtx.getComponentHolder().add(flushingComponent);
ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, opCtx);
- ioScheduler.scheduleOperation(new LSMFlushOperation(accessor, flushingComponent, relFlushFileRefs
- .getInsertIndexFileReference(), callback));
+ ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, relFlushFileRefs
+ .getInsertIndexFileReference(), null, callback));
}
@Override
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
- LSMFlushOperation flushOp = (LSMFlushOperation) operation;
+ LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation;
// Renaming order is critical because we use assume ordering when we
// read the file names when we open the tree.
// The RTree should be renamed before the BTree.
@@ -221,8 +220,8 @@
RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor();
SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
- LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getFlushTarget(), null,
- true);
+ LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getRTreeFlushTarget(),
+ null, true);
RTree diskRTree = component.getRTree();
// scan the memory BTree
@@ -281,7 +280,7 @@
bTreeTupleSorter.sort();
}
- IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L);
LSMRTreeFlushCursor cursor = new LSMRTreeFlushCursor(rTreeTupleSorter, bTreeTupleSorter, comparatorFields,
linearizerArray);
cursor.open(null, null);
@@ -337,7 +336,7 @@
LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
null, true);
RTree mergedRTree = component.getRTree();
- IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L);
try {
while (cursor.hasNext()) {
cursor.next();
@@ -374,8 +373,9 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput) throws TreeIndexException {
- return new LSMRTreeWithAntiMatterTuplesBulkLoader(fillLevel, verifyInput);
+ public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
+ return new LSMRTreeWithAntiMatterTuplesBulkLoader(fillLevel, verifyInput, numElementsHint);
}
private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
@@ -387,7 +387,8 @@
private final ILSMComponent component;
private final IIndexBulkLoader bulkLoader;
- public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+ public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
// Note that by using a flush target file name, we state that the
// new bulk loaded tree is "newer" than any other merged tree.
try {
@@ -397,7 +398,8 @@
} catch (IndexException e) {
throw new TreeIndexException(e);
}
- bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput);
+ bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
+ numElementsHint);
}
@Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
new file mode 100644
index 0000000..10b982f
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class LSMRTreeWithAntiMatterTuplesFileManager extends AbstractLSMIndexFileManager {
+
+ private final TreeIndexFactory<? extends ITreeIndex> rtreeFactory;
+
+ public LSMRTreeWithAntiMatterTuplesFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider,
+ FileReference file, TreeIndexFactory<? extends ITreeIndex> rtreeFactory, int startIODeviceIndex) {
+ super(ioManager, fileMapProvider, file, null, startIODeviceIndex);
+ this.rtreeFactory = rtreeFactory;
+ }
+
+ @Override
+ public LSMComponentFileReferences getRelFlushFileReference() {
+ Date date = new Date();
+ String ts = formatter.format(date);
+ // Begin timestamp and end timestamp are identical since it is a flush
+ return new LSMComponentFileReferences(createFlushFile(baseDir + ts + SPLIT_STRING + ts), null, null);
+ }
+
+ @Override
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
+ throws HyracksDataException {
+ String[] firstTimestampRange = firstFileName.split(SPLIT_STRING);
+ String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
+ // Get the range of timestamps by taking the earliest and the latest timestamps
+ return new LSMComponentFileReferences(createMergeFile(baseDir + firstTimestampRange[0] + SPLIT_STRING
+ + lastTimestampRange[1]), null, null);
+ }
+
+ private static FilenameFilter fileNameFilter = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return !name.startsWith(".");
+ }
+ };
+
+ @Override
+ public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException, IndexException {
+ List<LSMComponentFileReferences> validFiles = new ArrayList<LSMComponentFileReferences>();
+ ArrayList<ComparableFileName> allFiles = new ArrayList<ComparableFileName>();
+
+ // Gather files from all IODeviceHandles and delete invalid files
+ // There are two types of invalid files:
+ // (1) The isValid flag is not set
+ // (2) The file's interval is contained by some other file
+ // Here, we only filter out (1).
+ for (IODeviceHandle dev : ioManager.getIODevices()) {
+ cleanupAndGetValidFilesInternal(dev, fileNameFilter, rtreeFactory, allFiles);
+ }
+
+ if (allFiles.isEmpty()) {
+ return validFiles;
+ }
+
+ if (allFiles.size() == 1) {
+ validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null, null));
+ return validFiles;
+ }
+
+ // Sorts files names from earliest to latest timestamp.
+ Collections.sort(allFiles);
+
+ List<ComparableFileName> validComparableFiles = new ArrayList<ComparableFileName>();
+ ComparableFileName last = allFiles.get(0);
+ validComparableFiles.add(last);
+ for (int i = 1; i < allFiles.size(); i++) {
+ ComparableFileName current = allFiles.get(i);
+ // The current start timestamp is greater than last stop timestamp so current is valid.
+ if (current.interval[0].compareTo(last.interval[1]) > 0) {
+ validComparableFiles.add(current);
+ last = current;
+ } else if (current.interval[0].compareTo(last.interval[0]) >= 0
+ && current.interval[1].compareTo(last.interval[1]) <= 0) {
+ // The current file is completely contained in the interval of the
+ // last file. Thus the last file must contain at least as much information
+ // as the current file, so delete the current file.
+ current.fileRef.delete();
+ } else {
+ // This scenario should not be possible since timestamps are monotonically increasing.
+ throw new HyracksDataException("Found LSM files with overlapping timestamp intervals, "
+ + "but the intervals were not contained by another file.");
+ }
+ }
+
+ // Sort valid files in reverse lexicographical order, such that newer files come first.
+ Collections.sort(validComparableFiles, recencyCmp);
+ for (ComparableFileName cmpFileName : validComparableFiles) {
+ validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null, null));
+ }
+
+ return validFiles;
+ }
+}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
index e4d2e82..b3a4ab0 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
@@ -39,11 +39,11 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuples;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuplesFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.RTreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.tuples.LSMRTreeCopyTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.tuples.LSMRTreeTupleWriterFactory;
@@ -172,8 +172,8 @@
IBinaryComparatorFactory[] linearizerArray = { linearizerCmpFactory,
btreeCmpFactories[btreeCmpFactories.length - 1] };
- ILSMIndexFileManager fileNameManager = new LSMIndexFileManager(ioManager, diskFileMapProvider, file,
- diskRTreeFactory, startIODeviceIndex);
+ ILSMIndexFileManager fileNameManager = new LSMRTreeWithAntiMatterTuplesFileManager(ioManager,
+ diskFileMapProvider, file, diskRTreeFactory, startIODeviceIndex);
LSMRTreeWithAntiMatterTuples lsmTree = new LSMRTreeWithAntiMatterTuples(memBufferCache, memFreePageManager,
rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory,
fileNameManager, diskRTreeFactory, bulkLoadRTreeFactory, diskFileMapProvider, typeTraits.length,
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index 773d593..c12dc50 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -848,7 +848,8 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
// TODO: verifyInput currently does nothing.
try {
return new RTreeBulkLoader(fillFactor);
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
index 770a2ad..81f7fce 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
@@ -591,7 +591,7 @@
LOGGER.info("Bulk loading " + ins + " tuples");
}
long start = System.currentTimeMillis();
- IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false);
+ IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false, ins);
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
for (int i = 0; i < ins; i++) {
@@ -656,7 +656,7 @@
treeIndex.activate();
// Load sorted records, and expect to fail at tuple i.
- IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, true);
+ IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, true, ins);
for (int j = 0; j < ins; j++) {
if (j > i) {
fail("Bulk load failure test unexpectedly succeeded past tuple: " + j);
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
index 818373c..1a80231 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
@@ -106,7 +106,7 @@
}
public void checkDiskOrderScan(IIndexTestContext ctx) throws Exception {
- try {
+ try {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Testing Disk-Order Scan.");
}
@@ -243,7 +243,7 @@
ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
// Perform bulk load.
- IIndexBulkLoader bulkLoader = ctx.getIndex().createBulkLoader(0.7f, false);
+ IIndexBulkLoader bulkLoader = ctx.getIndex().createBulkLoader(0.7f, false, numTuples);
int c = 1;
for (CheckTuple checkTuple : checkTuples) {
if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
index 3998108..f962200 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
@@ -91,6 +91,15 @@
public static final int LSM_INVINDEX_SCAN_COUNT_ARRAY_SIZE = 1000000;
public static final int LSM_INVINDEX_MULTITHREAD_NUM_OPERATIONS = 200;
+ // Test params for BloomFilter
+ public static final int BLOOM_FILTER_NUM_TUPLES_TO_INSERT = 100;
+
+ // Mem configuration for BloomFilter.
+ public static final int BLOOM_FILTER_PAGE_SIZE = 256;
+ public static final int BLOOM_FILTER_NUM_PAGES = 1000;
+ public static final int BLOOM_FILTER_MAX_OPEN_FILES = 10;
+ public static final int BLOOM_FILTER_HYRACKS_FRAME_SIZE = 128;
+
}
/* ORIGINAL TEST PARAMETERS: DO NOT EDIT!
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
index e911a98..f93e9b6 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
@@ -688,7 +688,7 @@
LOGGER.info("Bulk loading " + numInserts + " tuples");
}
long start = System.currentTimeMillis();
- IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false);
+ IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false, numInserts);
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/pom.xml b/hyracks-tests/hyracks-storage-am-bloomfilter-test/pom.xml
new file mode 100644
index 0000000..3b15677
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/pom.xml
@@ -0,0 +1,49 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-bloomfilter-test</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-tests</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-bloomfilter</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-test-support</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
new file mode 100644
index 0000000..8856da1
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.logging.Level;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.util.AbstractBloomFilterTest;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+
+@SuppressWarnings("rawtypes")
+public class BloomFilterTest extends AbstractBloomFilterTest {
+ private final Random rnd = new Random(50);
+
+ @Before
+ public void setUp() throws HyracksDataException {
+ super.setUp();
+ }
+
+ @Test
+ public void singleFieldTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING BLOOM FILTER");
+ }
+
+ IBufferCache bufferCache = harness.getBufferCache();
+
+ int numElements = 100;
+ int[] keyFields = { 0 };
+ int numHashes = 5;
+
+ BloomFilter bf = new BloomFilter(bufferCache, harness.getFileMapProvider(), harness.getFileReference(),
+ keyFields);
+
+ bf.create();
+ bf.activate();
+ IIndexBulkLoader builder = bf.createBuilder(numElements, numHashes);
+
+ int fieldCount = 2;
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+
+ // generate keys
+ int maxKey = 1000;
+ TreeSet<Integer> uniqueKeys = new TreeSet<Integer>();
+ ArrayList<Integer> keys = new ArrayList<Integer>();
+ while (uniqueKeys.size() < numElements) {
+ int key = rnd.nextInt() % maxKey;
+ uniqueKeys.add(key);
+ }
+ for (Integer i : uniqueKeys) {
+ keys.add(i);
+ }
+
+ // Insert tuples in the bloom filter
+ for (int i = 0; i < keys.size(); ++i) {
+ TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
+ builder.add(tuple);
+ }
+ builder.end();
+
+ // Check all the inserted tuples can be found.
+
+ long[] hashes = new long[2];
+ for (int i = 0; i < keys.size(); ++i) {
+ TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
+ Assert.assertTrue(bf.contains(tuple, hashes));
+ }
+
+ bf.deactivate();
+ bf.destroy();
+ }
+
+ @Test
+ public void multiFieldTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING BLOOM FILTER");
+ }
+
+ IBufferCache bufferCache = harness.getBufferCache();
+
+ int numElements = 10000;
+ int[] keyFields = { 2, 4, 1 };
+ int numHashes = 10;
+
+ BloomFilter bf = new BloomFilter(bufferCache, harness.getFileMapProvider(), harness.getFileReference(),
+ keyFields);
+
+ bf.create();
+ bf.activate();
+ IIndexBulkLoader builder = bf.createBuilder(numElements, numHashes);
+
+ int fieldCount = 5;
+ ISerializerDeserializer[] fieldSerdes = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+
+ int maxLength = 20;
+ ArrayList<String> s1 = new ArrayList<String>();
+ ArrayList<String> s2 = new ArrayList<String>();
+ ArrayList<String> s3 = new ArrayList<String>();
+ ArrayList<String> s4 = new ArrayList<String>();
+ for (int i = 0; i < numElements; ++i) {
+ s1.add(randomString(rnd.nextInt() % maxLength, rnd));
+ s2.add(randomString(rnd.nextInt() % maxLength, rnd));
+ s3.add(randomString(rnd.nextInt() % maxLength, rnd));
+ s4.add(randomString(rnd.nextInt() % maxLength, rnd));
+ }
+
+ for (int i = 0; i < numElements; ++i) {
+ TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1.get(i), s2.get(i), i, s3.get(i), s4.get(i));
+ builder.add(tuple);
+ }
+ builder.end();
+
+ long[] hashes = new long[2];
+ for (int i = 0; i < numElements; ++i) {
+ TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1.get(i), s2.get(i), i, s3.get(i), s4.get(i));
+ Assert.assertTrue(bf.contains(tuple, hashes));
+ }
+
+ bf.deactivate();
+ bf.destroy();
+ }
+}
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java
new file mode 100644
index 0000000..284a6cb
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/MurmurHashForITupleReferenceTest.java
@@ -0,0 +1,296 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.logging.Level;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.util.AbstractBloomFilterTest;
+
+@SuppressWarnings("rawtypes")
+public class MurmurHashForITupleReferenceTest extends AbstractBloomFilterTest {
+ private final static int NUM_LONG_VARS_FOR_128_BIT_HASH = 2;
+ private final static int DUMMY_FIELD = 0;
+ private final Random rnd = new Random(50);
+
+ @Before
+ public void setUp() throws HyracksDataException {
+ super.setUp();
+ }
+
+ @Test
+ public void murmurhashONEIntegerFieldTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING MURMUR HASH ONE INTEGER FIELD");
+ }
+
+ int fieldCount = 2;
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ TupleUtils.createIntegerTuple(tupleBuilder, tuple, rnd.nextInt());
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+
+ int keyFields[] = { 0 };
+ int length = getTupleSize(tuple, keyFields);
+
+ long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+ ByteBuffer buffer;
+ byte[] array = new byte[length];
+ fillArrayWithData(array, keyFields, tuple, length);
+ buffer = ByteBuffer.wrap(array);
+
+ long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+ Assert.assertArrayEquals(expecteds, actuals);
+ }
+
+ @Test
+ public void murmurhashTwoIntegerFieldsTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING MURMUR HASH TWO INTEGER FIELDS");
+ }
+
+ int fieldCount = 2;
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ TupleUtils.createIntegerTuple(tupleBuilder, tuple, rnd.nextInt(), rnd.nextInt());
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+
+ int keyFields[] = { 0, 1 };
+ int length = getTupleSize(tuple, keyFields);
+
+ long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+ ByteBuffer buffer;
+ byte[] array = new byte[length];
+ fillArrayWithData(array, keyFields, tuple, length);
+ buffer = ByteBuffer.wrap(array);
+
+ long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+ Assert.assertArrayEquals(expecteds, actuals);
+ }
+
+ @Test
+ public void murmurhashOneStringFieldTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING MURMUR HASH ONE STRING FIELD");
+ }
+
+ int fieldCount = 2;
+ ISerializerDeserializer[] fieldSerdes = { UTF8StringSerializerDeserializer.INSTANCE };
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ String s = randomString(100, rnd);
+ TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s);
+
+ int keyFields[] = { 0 };
+ int length = getTupleSize(tuple, keyFields);
+
+ long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+ byte[] array = new byte[length];
+ ByteBuffer buffer;
+ fillArrayWithData(array, keyFields, tuple, length);
+ buffer = ByteBuffer.wrap(array);
+
+ long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+ Assert.assertArrayEquals(expecteds, actuals);
+ }
+
+ @Test
+ public void murmurhashThreeStringFieldsTest() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TESTING MURMUR HASH THREE STRING FIELDS");
+ }
+
+ int fieldCount = 3;
+ ISerializerDeserializer[] fieldSerdes = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ String s1 = randomString(40, rnd);
+ String s2 = randomString(60, rnd);
+ String s3 = randomString(20, rnd);
+ TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1, s2, s3);
+
+ int keyFields[] = { 2, 0, 1 };
+ int length = getTupleSize(tuple, keyFields);
+
+ long actuals[] = new long[NUM_LONG_VARS_FOR_128_BIT_HASH];
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, 0L, actuals);
+
+ byte[] array = new byte[length];
+ ByteBuffer buffer;
+ fillArrayWithData(array, keyFields, tuple, length);
+ buffer = ByteBuffer.wrap(array);
+
+ long[] expecteds = hash3_x64_128(buffer, 0, length, 0L);
+ Assert.assertArrayEquals(expecteds, actuals);
+ }
+
+ private void fillArrayWithData(byte[] array, int[] keyFields, ITupleReference tuple, int length) {
+ int currentFieldIndex = 0;
+ int bytePos = 0;
+ for (int i = 0; i < length; ++i) {
+ array[i] = tuple.getFieldData(DUMMY_FIELD)[tuple.getFieldStart(keyFields[currentFieldIndex]) + bytePos];
+ ++bytePos;
+ if (tuple.getFieldLength(keyFields[currentFieldIndex]) == bytePos) {
+ ++currentFieldIndex;
+ bytePos = 0;
+ }
+ }
+ }
+
+ private int getTupleSize(ITupleReference tuple, int[] keyFields) {
+ int length = 0;
+ for (int i = 0; i < keyFields.length; ++i) {
+ length += tuple.getFieldLength(keyFields[i]);
+ }
+ return length;
+ }
+
+ /**
+ * The hash3_x64_128 and getblock functions are borrowed from cassandra source code for testing purpose
+ **/
+ protected static long getblock(ByteBuffer key, int offset, int index) {
+ int i_8 = index << 3;
+ int blockOffset = offset + i_8;
+ return ((long) key.get(blockOffset + 0) & 0xff) + (((long) key.get(blockOffset + 1) & 0xff) << 8)
+ + (((long) key.get(blockOffset + 2) & 0xff) << 16) + (((long) key.get(blockOffset + 3) & 0xff) << 24)
+ + (((long) key.get(blockOffset + 4) & 0xff) << 32) + (((long) key.get(blockOffset + 5) & 0xff) << 40)
+ + (((long) key.get(blockOffset + 6) & 0xff) << 48) + (((long) key.get(blockOffset + 7) & 0xff) << 56);
+ }
+
+ public static long[] hash3_x64_128(ByteBuffer key, int offset, int length, long seed) {
+ final int nblocks = length >> 4; // Process as 128-bit blocks.
+
+ long h1 = seed;
+ long h2 = seed;
+
+ long c1 = 0x87c37b91114253d5L;
+ long c2 = 0x4cf5ad432745937fL;
+
+ //----------
+ // body
+
+ for (int i = 0; i < nblocks; i++) {
+ long k1 = getblock(key, offset, i * 2 + 0);
+ long k2 = getblock(key, offset, i * 2 + 1);
+
+ k1 *= c1;
+ k1 = MurmurHash128Bit.rotl64(k1, 31);
+ k1 *= c2;
+ h1 ^= k1;
+
+ h1 = MurmurHash128Bit.rotl64(h1, 27);
+ h1 += h2;
+ h1 = h1 * 5 + 0x52dce729;
+
+ k2 *= c2;
+ k2 = MurmurHash128Bit.rotl64(k2, 33);
+ k2 *= c1;
+ h2 ^= k2;
+
+ h2 = MurmurHash128Bit.rotl64(h2, 31);
+ h2 += h1;
+ h2 = h2 * 5 + 0x38495ab5;
+ }
+
+ //----------
+ // tail
+
+ // Advance offset to the unprocessed tail of the data.
+ offset += nblocks * 16;
+
+ long k1 = 0;
+ long k2 = 0;
+
+ switch (length & 15) {
+ case 15:
+ k2 ^= ((long) key.get(offset + 14)) << 48;
+ case 14:
+ k2 ^= ((long) key.get(offset + 13)) << 40;
+ case 13:
+ k2 ^= ((long) key.get(offset + 12)) << 32;
+ case 12:
+ k2 ^= ((long) key.get(offset + 11)) << 24;
+ case 11:
+ k2 ^= ((long) key.get(offset + 10)) << 16;
+ case 10:
+ k2 ^= ((long) key.get(offset + 9)) << 8;
+ case 9:
+ k2 ^= ((long) key.get(offset + 8)) << 0;
+ k2 *= c2;
+ k2 = MurmurHash128Bit.rotl64(k2, 33);
+ k2 *= c1;
+ h2 ^= k2;
+
+ case 8:
+ k1 ^= ((long) key.get(offset + 7)) << 56;
+ case 7:
+ k1 ^= ((long) key.get(offset + 6)) << 48;
+ case 6:
+ k1 ^= ((long) key.get(offset + 5)) << 40;
+ case 5:
+ k1 ^= ((long) key.get(offset + 4)) << 32;
+ case 4:
+ k1 ^= ((long) key.get(offset + 3)) << 24;
+ case 3:
+ k1 ^= ((long) key.get(offset + 2)) << 16;
+ case 2:
+ k1 ^= ((long) key.get(offset + 1)) << 8;
+ case 1:
+ k1 ^= ((long) key.get(offset));
+ k1 *= c1;
+ k1 = MurmurHash128Bit.rotl64(k1, 31);
+ k1 *= c2;
+ h1 ^= k1;
+ };
+
+ //----------
+ // finalization
+
+ h1 ^= length;
+ h2 ^= length;
+
+ h1 += h2;
+ h2 += h1;
+
+ h1 = MurmurHash128Bit.fmix(h1);
+ h2 = MurmurHash128Bit.fmix(h2);
+
+ h1 += h2;
+ h2 += h1;
+
+ return (new long[] { h1, h2 });
+ }
+}
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java
new file mode 100644
index 0000000..9b857a6
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/AbstractBloomFilterTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.util;
+
+import java.util.Random;
+import java.util.logging.Logger;
+
+import org.junit.After;
+import org.junit.Before;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractBloomFilterTest {
+ protected final Logger LOGGER = Logger.getLogger(BloomFilterTestHarness.class.getName());
+
+ protected final BloomFilterTestHarness harness;
+
+ public AbstractBloomFilterTest() {
+ harness = new BloomFilterTestHarness();
+ }
+
+ public AbstractBloomFilterTest(int pageSize, int numPages, int maxOpenFiles, int hyracksFrameSize) {
+ harness = new BloomFilterTestHarness(pageSize, numPages, maxOpenFiles, hyracksFrameSize);
+ }
+
+ @Before
+ public void setUp() throws HyracksDataException {
+ harness.setUp();
+ }
+
+ @After
+ public void tearDown() throws HyracksDataException {
+ harness.tearDown();
+ }
+
+ public static String randomString(int length, Random random) {
+ char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+ StringBuilder strBuilder = new StringBuilder();
+ for (int i = 0; i < length; ++i) {
+ char c = chars[random.nextInt(chars.length)];
+ strBuilder.append(c);
+ }
+ return strBuilder.toString();
+ }
+}
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.java
new file mode 100644
index 0000000..8fac122
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/util/BloomFilterTestHarness.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.bloomfilter.util;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
+import edu.uci.ics.hyracks.test.support.TestUtils;
+
+public class BloomFilterTestHarness {
+
+ private static final long RANDOM_SEED = 50;
+
+ protected final int pageSize;
+ protected final int numPages;
+ protected final int maxOpenFiles;
+ protected final int hyracksFrameSize;
+
+ protected IHyracksTaskContext ctx;
+ protected IBufferCache bufferCache;
+ protected IFileMapProvider fileMapProvider;
+ protected FileReference file;
+
+ protected final Random rnd = new Random();
+ protected final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
+ protected final String tmpDir = System.getProperty("java.io.tmpdir");
+ protected final String sep = System.getProperty("file.separator");
+ protected String fileName;
+
+ public BloomFilterTestHarness() {
+ this.pageSize = AccessMethodTestsConfig.BLOOM_FILTER_PAGE_SIZE;
+ this.numPages = AccessMethodTestsConfig.BLOOM_FILTER_NUM_PAGES;
+ this.maxOpenFiles = AccessMethodTestsConfig.BLOOM_FILTER_MAX_OPEN_FILES;
+ this.hyracksFrameSize = AccessMethodTestsConfig.BLOOM_FILTER_HYRACKS_FRAME_SIZE;
+ }
+
+ public BloomFilterTestHarness(int pageSize, int numPages, int maxOpenFiles, int hyracksFrameSize) {
+ this.pageSize = pageSize;
+ this.numPages = numPages;
+ this.maxOpenFiles = maxOpenFiles;
+ this.hyracksFrameSize = hyracksFrameSize;
+ }
+
+ public void setUp() throws HyracksDataException {
+ fileName = tmpDir + sep + simpleDateFormat.format(new Date());
+ ctx = TestUtils.create(getHyracksFrameSize());
+ TestStorageManagerComponentHolder.init(pageSize, numPages, maxOpenFiles);
+ bufferCache = TestStorageManagerComponentHolder.getBufferCache(ctx);
+ fileMapProvider = TestStorageManagerComponentHolder.getFileMapProvider(ctx);
+ file = new FileReference(new File(fileName));
+ rnd.setSeed(RANDOM_SEED);
+ }
+
+ public void tearDown() throws HyracksDataException {
+ bufferCache.close();
+ file.delete();
+ }
+
+ public IHyracksTaskContext getHyracksTaskContext() {
+ return ctx;
+ }
+
+ public IBufferCache getBufferCache() {
+ return bufferCache;
+ }
+
+ public IFileMapProvider getFileMapProvider() {
+ return fileMapProvider;
+ }
+
+ public FileReference getFileReference() {
+ return file;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public Random getRandom() {
+ return rnd;
+ }
+
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ public int getNumPages() {
+ return numPages;
+ }
+
+ public int getHyracksFrameSize() {
+ return hyracksFrameSize;
+ }
+
+ public int getMaxOpenFiles() {
+ return maxOpenFiles;
+ }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index d0d2c77..64f58c3 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -228,7 +228,7 @@
throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
}
- IIndexBulkLoader bulkloader = index.createBulkLoader(1.0f, false);
+ IIndexBulkLoader bulkloader = index.createBulkLoader(1.0f, false, end - begin);
for (int i = begin; i <= end; i++) {
TupleUtils.createIntegerTuple(builder, tuple, i);
bulkloader.add(tuple);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java
index 2e9395f..69e2b58 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java
@@ -37,7 +37,7 @@
public long runExperiment(DataGenThread dataGen, int numThreads) throws Exception {
btree.create();
long start = System.currentTimeMillis();
- IIndexBulkLoader bulkLoader = btree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader bulkLoader = btree.createBulkLoader(1.0f, false, 0L);
for (int i = 0; i < numBatches; i++) {
TupleBatch batch = dataGen.tupleBatchQueue.take();
for (int j = 0; j < batch.size(); j++) {
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index 7b54884..97f78f3 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -205,7 +205,7 @@
ISerializerDeserializer[] fieldSerdes = testCtx.getFieldSerdes();
// Use the expected index to bulk-load the actual index.
- IIndexBulkLoader bulkLoader = testCtx.getIndex().createBulkLoader(1.0f, false);
+ IIndexBulkLoader bulkLoader = testCtx.getIndex().createBulkLoader(1.0f, false, numDocs);
ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(testCtx.getFieldSerdes().length);
ArrayTupleReference tuple = new ArrayTupleReference();
Iterator<CheckTuple> checkTupleIter = tmpMemIndex.iterator();
diff --git a/hyracks-tests/pom.xml b/hyracks-tests/pom.xml
index b79295a..4011339 100644
--- a/hyracks-tests/pom.xml
+++ b/hyracks-tests/pom.xml
@@ -19,5 +19,6 @@
<module>hyracks-storage-am-lsm-btree-test</module>
<module>hyracks-storage-am-lsm-rtree-test</module>
<module>hyracks-storage-am-lsm-invertedindex-test</module>
+ <module>hyracks-storage-am-bloomfilter-test</module>
</modules>
</project>
diff --git a/pom.xml b/pom.xml
index 49a5887..c642992 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,6 +95,7 @@
<module>hyracks-cli</module>
<module>hyracks-storage-common</module>
<module>hyracks-storage-am-common</module>
+ <module>hyracks-storage-am-bloomfilter</module>
<module>hyracks-storage-am-btree</module>
<module>hyracks-storage-am-lsm-invertedindex</module>
<module>hyracks-storage-am-lsm-common</module>