Made sure bloom filters don't hold latches after insertion is over (pages are still pinned). Changed the bloom filter insertion to use a bulkload-like interface. Changed the bulkload interface for all indexes to accept number of elements hint. Various bug fixes.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree_bloom_filter@2751 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index e55882a..56d7837 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -152,7 +152,7 @@
IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, btreeSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, 0.7f, false, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ fieldPermutation, 0.7f, false, 1000L, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
// distribute the records from the datagen via hashing to the bulk load
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 3a4e59f..d7480ec 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -144,7 +144,7 @@
IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, btreeSplitProvider, secondaryTypeTraits, comparatorFactories,
- fieldPermutation, 0.7f, false, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ fieldPermutation, 0.7f, false, 1000L, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
// connect the ops
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 739f107..7e69d4d 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -172,7 +172,7 @@
int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
- fieldPermutation, 0.7f, true, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ fieldPermutation, 0.7f, true, 1000L, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -234,7 +234,7 @@
int[] fieldPermutation = { 3, 0 };
TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, fieldPermutation, 0.7f, true, dataflowHelperFactory,
+ secondaryComparatorFactories, fieldPermutation, 0.7f, true, 1000L, dataflowHelperFactory,
NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index 05b7a26..3a77b60 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -199,7 +199,7 @@
int[] fieldPermutation = { 0, 1 };
TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
- primaryComparatorFactories, fieldPermutation, 0.7f, true, btreeDataflowHelperFactory,
+ primaryComparatorFactories, fieldPermutation, 0.7f, true, 1000L, btreeDataflowHelperFactory,
NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
return primaryBtreeBulkLoad;
@@ -273,7 +273,7 @@
private IOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation) {
LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
- spec, fieldPermutation, true, storageManager, btreeFileSplitProvider, lcManagerProvider,
+ spec, fieldPermutation, true, 1000L, storageManager, btreeFileSplitProvider, lcManagerProvider,
tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
tokenizerFactory, invertedIndexDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 428b354..97f6908 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -223,7 +223,7 @@
int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
- fieldPermutation, 0.7f, false, btreeDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ fieldPermutation, 0.7f, false, 1000L, btreeDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBulkLoad, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -279,7 +279,7 @@
int[] fieldPermutation = { 6, 7, 8, 9, 0 };
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, fieldPermutation, 0.7f, false, rtreeDataflowHelperFactory,
+ secondaryComparatorFactories, fieldPermutation, 0.7f, false, 1000L, rtreeDataflowHelperFactory,
NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBulkLoad, NC1_ID);
diff --git a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index 69010f3..a6a45de 100644
--- a/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++ b/hyracks-storage-am-bloomfilter/src/main/java/edu/uci/ics/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -21,6 +21,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
@@ -58,6 +60,7 @@
this.fileMapProvider = fileMapProvider;
this.file = file;
this.keyFields = keyFields;
+ numBitsPerPage = bufferCache.getPageSize() * Byte.SIZE;
}
public int getFileId() {
@@ -82,21 +85,6 @@
return numElements;
}
- public void add(ITupleReference tuple, long[] hashes) {
- MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
- for (int i = 0; i < numHashes; ++i) {
- long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
-
- ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
- int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
- byte b = buffer.get(byteIndex);
- int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
- b = (byte) (b | (1 << bitIndex));
-
- buffer.put(byteIndex, b);
- }
- }
-
public boolean contains(ITupleReference tuple, long[] hashes) {
MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
for (int i = 0; i < numHashes; ++i) {
@@ -135,44 +123,19 @@
}
}
- private void persistBloomFilterMetaData() throws HyracksDataException {
- ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
- metaPage.acquireWriteLatch();
- metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, numPages);
- metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
- metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, numElements);
- metaPage.getBuffer().putLong(NUM_BITS_OFFSET, numBits);
- metaPage.releaseWriteLatch();
- bufferCache.unpin(metaPage);
- }
-
- private void readBloomFilterMetaData() throws HyracksDataException {
- ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
- metaPage.acquireReadLatch();
- numPages = metaPage.getBuffer().getInt(NUM_PAGES_OFFSET);
- numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
- numElements = metaPage.getBuffer().getLong(NUM_ELEMENTS_OFFSET);
- numBits = metaPage.getBuffer().getLong(NUM_BITS_OFFSET);
- metaPage.releaseReadLatch();
- bufferCache.unpin(metaPage);
- }
-
- public synchronized void create(long numElements, int numHashes) throws HyracksDataException {
+ public synchronized void create() throws HyracksDataException {
if (isActivated) {
throw new HyracksDataException("Failed to create the bloom filter since it is activated.");
}
- this.numElements = numElements;
- this.numHashes = numHashes;
- numBitsPerPage = bufferCache.getPageSize() * Byte.SIZE;
- numBits = numElements * NUM_BITS_PER_ELEMENT;
- long tmp = (long) Math.ceil(numBits / (double) numBitsPerPage);
- if (tmp > Integer.MAX_VALUE) {
- throw new HyracksDataException("Cannot create a bloom filter with his huge number of pages.");
- }
- numPages = (int) tmp;
-
prepareFile();
- persistBloomFilterMetaData();
+ ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), true);
+ metaPage.acquireWriteLatch();
+ metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, 0);
+ metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, 0);
+ metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, 0L);
+ metaPage.getBuffer().putLong(NUM_BITS_OFFSET, 0L);
+ metaPage.releaseWriteLatch();
+ bufferCache.unpin(metaPage);
bufferCache.closeFile(fileId);
}
@@ -187,25 +150,30 @@
int currentPageId = 1;
while (currentPageId <= numPages) {
ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false);
- page.acquireWriteLatch();
bloomFilterPages.add(page);
++currentPageId;
}
isActivated = true;
}
+ private void readBloomFilterMetaData() throws HyracksDataException {
+ ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
+ metaPage.acquireReadLatch();
+ numPages = metaPage.getBuffer().getInt(NUM_PAGES_OFFSET);
+ numHashes = metaPage.getBuffer().getInt(NUM_HASHES_USED_OFFSET);
+ numElements = metaPage.getBuffer().getLong(NUM_ELEMENTS_OFFSET);
+ numBits = metaPage.getBuffer().getLong(NUM_BITS_OFFSET);
+ metaPage.releaseReadLatch();
+ bufferCache.unpin(metaPage);
+ }
+
public synchronized void deactivate() throws HyracksDataException {
if (!isActivated) {
return;
}
- if (fileId == 1) {
- System.out.println();
- }
for (int i = 0; i < numPages; ++i) {
- ICachedPage page = bloomFilterPages.get(i);
- page.releaseWriteLatch();
- bufferCache.unpin(page);
+ bufferCache.unpin(bloomFilterPages.get(i));
}
bloomFilterPages.clear();
bufferCache.closeFile(fileId);
@@ -224,4 +192,77 @@
bufferCache.deleteFile(fileId, false);
fileId = -1;
}
+
+ public IIndexBulkLoader createBuilder(long numElements, int numHashes) throws HyracksDataException {
+ return new BloomFilterBuilder(numElements, numHashes);
+ }
+
+ public class BloomFilterBuilder implements IIndexBulkLoader {
+ private final long[] hashes = new long[2];
+
+ private final long numElements;
+ private final int numHashes;
+ private final long numBits;
+ private final int numPages;
+
+ public BloomFilterBuilder(long numElements, int numHashes) throws HyracksDataException {
+ if (!isActivated) {
+ throw new HyracksDataException("Failed to create the bloom filter builder since it is not activated.");
+ }
+ this.numElements = numElements;
+ this.numHashes = numHashes;
+ numBits = numElements * NUM_BITS_PER_ELEMENT;
+ long tmp = (long) Math.ceil(numBits / (double) numBitsPerPage);
+ if (tmp > Integer.MAX_VALUE) {
+ throw new HyracksDataException("Cannot create a bloom filter with his huge number of pages.");
+ }
+ numPages = (int) tmp;
+ persistBloomFilterMetaData();
+ readBloomFilterMetaData();
+
+ int currentPageId = 1;
+ while (currentPageId <= numPages) {
+ ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), true);
+ page.acquireWriteLatch();
+ bloomFilterPages.add(page);
+ ++currentPageId;
+ }
+ }
+
+ private void persistBloomFilterMetaData() throws HyracksDataException {
+ ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false);
+ metaPage.acquireWriteLatch();
+ metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, numPages);
+ metaPage.getBuffer().putInt(NUM_HASHES_USED_OFFSET, numHashes);
+ metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, numElements);
+ metaPage.getBuffer().putLong(NUM_BITS_OFFSET, numBits);
+ metaPage.releaseWriteLatch();
+ bufferCache.unpin(metaPage);
+ }
+
+ @Override
+ public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
+ MurmurHash128Bit.hash3_x64_128(tuple, keyFields, SEED, hashes);
+ for (int i = 0; i < numHashes; ++i) {
+ long hash = Math.abs((hashes[0] + (long) i * hashes[1]) % numBits);
+
+ ByteBuffer buffer = bloomFilterPages.get((int) (hash / numBitsPerPage)).getBuffer();
+ int byteIndex = (int) (hash % numBitsPerPage) >> 3; // divide by 8
+ byte b = buffer.get(byteIndex);
+ int bitIndex = (int) (hash % numBitsPerPage) & 0x07; // mod 8
+ b = (byte) (b | (1 << bitIndex));
+
+ buffer.put(byteIndex, b);
+ }
+ }
+
+ @Override
+ public void end() throws HyracksDataException, IndexException {
+ for (int i = 0; i < numPages; ++i) {
+ ICachedPage page = bloomFilterPages.get(i);
+ page.releaseWriteLatch();
+ }
+ }
+
+ }
}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index d825b89..86bc32a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -910,7 +910,8 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
try {
return new BTreeBulkLoader(fillFactor, verifyInput);
} catch (HyracksDataException e) {
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java
index d6d74ee..1557c75 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndex.java
@@ -22,69 +22,62 @@
* This interface describes the operations common to all indexes. Indexes
* implementing this interface can easily reuse existing index operators for
* dataflow. Users must perform operations on an via an {@link IIndexAccessor}.
- *
- * During dataflow, the lifecycle of IIndexes are handled through an
- * {@link IIndexLifecycleManager}.
+ * During dataflow, the lifecycle of IIndexes are handled through an {@link IIndexLifecycleManager}.
*/
public interface IIndex {
/**
- * Initializes the persistent state of an index.
- *
+ * Initializes the persistent state of an index.
* An index cannot be created if it is in the activated state.
* Calling create on an index that is deactivated has the effect of clearing the index.
*
- * @throws HyracksDataException
- * if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
- * creating files, or deleting files
- *
- * if the index is in the activated state
+ * @throws HyracksDataException
+ * if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
+ * creating files, or deleting files
+ * if the index is in the activated state
*/
public void create() throws HyracksDataException;
/**
- * Initializes the index's operational state. An index in the activated state may perform
+ * Initializes the index's operational state. An index in the activated state may perform
* operations via an {@link IIndexAccessor}.
*
* @throws HyracksDataException
- * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
- * creating files, or deleting files
+ * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
+ * creating files, or deleting files
*/
public void activate() throws HyracksDataException;
/**
- * Resets the operational state of the index. Calling clear has the same logical effect
- * as calling deactivate(), destroy(), create(), then activate(), but not necessarily the
+ * Resets the operational state of the index. Calling clear has the same logical effect
+ * as calling deactivate(), destroy(), create(), then activate(), but not necessarily the
* same physical effect.
*
* @throws HyracksDataException
- * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
- * creating files, or deleting files
- *
- * if the index is not in the activated state
+ * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
+ * creating files, or deleting files
+ * if the index is not in the activated state
*/
public void clear() throws HyracksDataException;
/**
- * Deinitializes the index's operational state. An index in the deactivated state may not
+ * Deinitializes the index's operational state. An index in the deactivated state may not
* perform operations.
*
* @throws HyracksDataException
- * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
- * creating files, or deleting files
+ * if there is a problem in the BufferCache while (un)pinning pages, (un)latching pages,
+ * creating files, or deleting files
*/
public void deactivate() throws HyracksDataException;
/**
- * Removes the persistent state of an index.
- *
+ * Removes the persistent state of an index.
* An index cannot be destroyed if it is in the activated state.
*
- * @throws HyracksDataException
- * if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
- * creating files, or deleting files
- *
- * if the index is already activated
+ * @throws HyracksDataException
+ * if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
+ * creating files, or deleting files
+ * if the index is already activated
*/
public void destroy() throws HyracksDataException;
@@ -94,8 +87,10 @@
* on the same {@link IIndex}.
*
* @returns IIndexAccessor an accessor for this {@link IIndex}
- * @param modificationCallback the callback to be used for modification operations
- * @param searchCallback the callback to be used for search operations
+ * @param modificationCallback
+ * the callback to be used for modification operations
+ * @param searchCallback
+ * the callback to be used for search operations
*/
public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback);
@@ -105,7 +100,7 @@
* An assertion error is thrown if validation fails.
*
* @throws HyracksDataException
- * if there is an error performing validation
+ * if there is an error performing validation
*/
public void validate() throws HyracksDataException;
@@ -124,5 +119,6 @@
* @param verifyInput
* @throws IndexException
*/
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException;
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws IndexException;
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index de4e627..1b6271d 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -33,6 +33,7 @@
private final IHyracksTaskContext ctx;
private final float fillFactor;
private final boolean verifyInput;
+ private final long numElementsHint;
private final IIndexDataflowHelper indexHelper;
private FrameTupleAccessor accessor;
private IIndex index;
@@ -41,12 +42,14 @@
private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- int[] fieldPermutation, float fillFactor, boolean verifyInput, IRecordDescriptorProvider recordDescProvider) {
+ int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+ IRecordDescriptorProvider recordDescProvider) {
this.opDesc = opDesc;
this.ctx = ctx;
this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
this.fillFactor = fillFactor;
this.verifyInput = verifyInput;
+ this.numElementsHint = numElementsHint;
this.recDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
}
@@ -58,7 +61,7 @@
indexHelper.open();
index = indexHelper.getIndexInstance();
try {
- bulkLoader = index.createBulkLoader(fillFactor, verifyInput);
+ bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint);
} catch (Exception e) {
indexHelper.close();
throw new HyracksDataException(e);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
index fea6463..c58dee5 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
@@ -35,12 +35,13 @@
private final int[] fieldPermutation;
private final float fillFactor;
private final boolean verifyInput;
+ private final long numElementsHint;
public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec,
IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation, float fillFactor,
- boolean verifyInput, IIndexDataflowHelperFactory dataflowHelperFactory,
+ boolean verifyInput, long numElementsHint, IIndexDataflowHelperFactory dataflowHelperFactory,
IModificationOperationCallbackFactory modificationOpCallbackFactory) {
super(spec, 1, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, dataflowHelperFactory, null, false, NoOpLocalResourceFactoryProvider.INSTANCE,
@@ -48,12 +49,13 @@
this.fieldPermutation = fieldPermutation;
this.fillFactor = fillFactor;
this.verifyInput = verifyInput;
+ this.numElementsHint = numElementsHint;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new IndexBulkLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor, verifyInput,
- recordDescProvider);
+ numElementsHint, recordDescProvider);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 370e094..9bf4a4f 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -152,9 +152,6 @@
return;
}
- if (fileId == 0) {
- System.out.println();
- }
bufferCache.closeFile(fileId);
freePageManager.close();
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 6d76a86..b557125 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -387,20 +387,20 @@
LSMBTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getBTreeFlushTarget(),
flushOp.getBloomFilterFlushTarget(), numElements, numHashes, true);
- IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false);
- BloomFilter bf = component.getBloomFilter();
- long[] hashes = new long[2];
+ IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false, numElements);
+ IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numElements, numHashes);
IIndexCursor scanCursor = accessor.createSearchCursor();
accessor.search(scanCursor, nullPred);
try {
while (scanCursor.hasNext()) {
scanCursor.next();
- bf.add(scanCursor.getTuple(), hashes);
+ builder.add(scanCursor.getTuple());
bulkLoader.add(scanCursor.getTuple());
}
} finally {
scanCursor.close();
+ builder.end();
}
bulkLoader.end();
return component;
@@ -440,25 +440,24 @@
numElements += ((LSMBTreeImmutableComponent) mergedComponents.get(i)).getBloomFilter().getNumElements();
}
- LSMBTreeImmutableComponent mergedBTree = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
- mergeOp.getBloomFilterMergeTarget(), numElements, numHashes, true);
+ LSMBTreeImmutableComponent mergedComponent = createDiskComponent(componentFactory,
+ mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), numElements, numHashes, true);
- BloomFilter bf = mergedBTree.getBloomFilter();
- long[] hashes = new long[2];
-
- IIndexBulkLoader bulkLoader = mergedBTree.getBTree().createBulkLoader(1.0f, false);
+ IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements);
+ IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements, numHashes);
try {
while (cursor.hasNext()) {
cursor.next();
ITupleReference frameTuple = cursor.getTuple();
- bf.add(frameTuple, hashes);
+ builder.add(frameTuple);
bulkLoader.add(frameTuple);
}
} finally {
cursor.close();
+ builder.end();
}
bulkLoader.end();
- return mergedBTree;
+ return mergedComponent;
}
private LSMBTreeImmutableComponent createDiskComponent(LSMBTreeImmutableComponentFactory factory,
@@ -469,7 +468,7 @@
.createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
if (createComponent) {
component.getBTree().create();
- component.getBloomFilter().create(numElements, numHashes);
+ component.getBloomFilter().create();
}
// BTree will be closed during cleanup of merge().
component.getBTree().activate();
@@ -478,8 +477,19 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput) throws TreeIndexException {
- return new LSMBTreeBulkLoader(fillLevel, verifyInput);
+ public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
+ try {
+ return new LSMBTreeBulkLoader(fillLevel, verifyInput, numElementsHint);
+ } catch (HyracksDataException e) {
+ throw new TreeIndexException(e);
+ }
+ }
+
+ private ILSMComponent createBulkLoadTarget(long numElementsHint) throws HyracksDataException, IndexException {
+ LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
+ return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(),
+ componentFileRefs.getBloomFilterFileReference(), numElementsHint, numHashes, true);
}
@Override
@@ -498,31 +508,29 @@
public class LSMBTreeBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final BTreeBulkLoader bulkLoader;
- private long numElements;
+ private final IIndexBulkLoader builder;
+ private boolean endCalledBasedOnFailure = false;
- public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+ public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException, HyracksDataException {
try {
- numElements = 0L;
- LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
- component = (LSMBTreeImmutableComponent) bulkLoadComponentFactory
- .createLSMComponentInstance(new LSMComponentFileReferences(componentFileRefs
- .getInsertIndexFileReference(), null, componentFileRefs.getBloomFilterFileReference()));
- ((LSMBTreeImmutableComponent) component).getBTree().create();
- ((LSMBTreeImmutableComponent) component).getBTree().activate();
+ component = createBulkLoadTarget(numElementsHint);
} catch (HyracksDataException e) {
throw new TreeIndexException(e);
} catch (IndexException e) {
throw new TreeIndexException(e);
}
bulkLoader = (BTreeBulkLoader) ((LSMBTreeImmutableComponent) component).getBTree().createBulkLoader(
- fillFactor, verifyInput);
+ fillFactor, verifyInput, numElementsHint);
+ builder = ((LSMBTreeImmutableComponent) component).getBloomFilter().createBuilder(numElementsHint,
+ numHashes);
}
@Override
public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
try {
bulkLoader.add(tuple);
- ++numElements;
+ builder.add(tuple);
} catch (IndexException e) {
handleException();
throw e;
@@ -535,33 +543,21 @@
}
}
- protected void handleException() throws HyracksDataException {
+ protected void handleException() throws HyracksDataException, IndexException {
((LSMBTreeImmutableComponent) component).getBTree().deactivate();
+ if (!endCalledBasedOnFailure) {
+ builder.end();
+ }
+ ((LSMBTreeImmutableComponent) component).getBloomFilter().deactivate();
((LSMBTreeImmutableComponent) component).getBTree().destroy();
+ ((LSMBTreeImmutableComponent) component).getBloomFilter().destroy();
}
@Override
public void end() throws HyracksDataException, IndexException {
bulkLoader.end();
- IIndexAccessor accessor = ((LSMBTreeImmutableComponent) component).getBTree().createAccessor(
- NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- IIndexCursor scanCursor = accessor.createSearchCursor();
- RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
- accessor.search(scanCursor, nullPred);
- long[] hashes = new long[2];
- int fileid1 = ((LSMBTreeImmutableComponent) component).getBTree().getFileId();
- int fileid2 = ((LSMBTreeImmutableComponent) component).getBloomFilter().getFileId();
- BloomFilter bf = ((LSMBTreeImmutableComponent) component).getBloomFilter();
- bf.create(numElements, numHashes);
- bf.activate();
- try {
- while (scanCursor.hasNext()) {
- scanCursor.next();
- bf.add(scanCursor.getTuple(), hashes);
- }
- } finally {
- scanCursor.close();
- }
+ builder.end();
+ endCalledBasedOnFailure = true;
lsmHarness.addBulkLoadedComponent(component);
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
index 6b07608..da3cad5 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
@@ -37,13 +37,14 @@
private final int[] fieldPermutation;
private final boolean verifyInput;
+ private final long numElementsHint;
public LSMInvertedIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] fieldPermutation,
- boolean verifyInput, IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
- IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
- IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
- IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
- IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory,
+ boolean verifyInput, long numElementsHint, IStorageManagerInterface storageManager,
+ IFileSplitProvider fileSplitProvider, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenComparatorFactories,
+ ITypeTraits[] invListsTypeTraits, IBinaryComparatorFactory[] invListComparatorFactories,
+ IBinaryTokenizerFactory tokenizerFactory, IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory,
IModificationOperationCallbackFactory modificationOpCallbackFactory) {
super(spec, 1, 0, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
@@ -51,12 +52,13 @@
NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
this.fieldPermutation = fieldPermutation;
this.verifyInput = verifyInput;
+ this.numElementsHint = numElementsHint;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new IndexBulkLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, 1.0f, verifyInput,
- recordDescProvider);
+ numElementsHint, recordDescProvider);
}
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index ca0e091..30a678d 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -408,7 +408,7 @@
memBTreeAccessor.search(scanCursor, nullPred);
// Bulk load the disk inverted index from the in-memory inverted index.
- IIndexBulkLoader invIndexBulkLoader = diskInvertedIndex.createBulkLoader(1.0f, false);
+ IIndexBulkLoader invIndexBulkLoader = diskInvertedIndex.createBulkLoader(1.0f, false, 0L);
try {
while (scanCursor.hasNext()) {
scanCursor.next();
@@ -429,7 +429,7 @@
deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred);
// Bulk load the deleted-keys BTree.
- IIndexBulkLoader deletedKeysBTreeBulkLoader = diskDeletedKeysBTree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader deletedKeysBTreeBulkLoader = diskDeletedKeysBTree.createBulkLoader(1.0f, false, 0L);
try {
while (deletedKeysScanCursor.hasNext()) {
deletedKeysScanCursor.next();
@@ -484,7 +484,7 @@
IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
IIndexCursor cursor = mergeOp.getCursor();
- IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true);
+ IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L);
try {
while (cursor.hasNext()) {
cursor.next();
@@ -512,15 +512,17 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
- return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput);
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws IndexException {
+ return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint);
}
public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final IIndexBulkLoader invIndexBulkLoader;
- public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+ public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws IndexException {
// Note that by using a flush target file name, we state that the
// new bulk loaded tree is "newer" than any other merged tree.
try {
@@ -531,7 +533,7 @@
throw new TreeIndexException(e);
}
invIndexBulkLoader = ((LSMInvertedIndexImmutableComponent) component).getInvIndex().createBulkLoader(
- fillFactor, verifyInput);
+ fillFactor, verifyInput, numElementsHint);
}
@Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index 668250c..d5a074e 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -191,7 +191,8 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws IndexException {
throw new UnsupportedOperationException("Bulk load not supported by in-memory inverted index.");
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index f1552eb..afeaf90 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -76,7 +76,7 @@
protected final int invListEndPageIdField;
protected final int invListStartOffField;
protected final int invListNumElementsField;
-
+
// Type traits to be appended to the token type trait which finally form the BTree field type traits.
protected static final ITypeTraits[] btreeValueTypeTraits = new ITypeTraits[4];
static {
@@ -283,7 +283,7 @@
btreeTuple.getFieldStart(invListNumElementsField));
listCursor.reset(startPageId, endPageId, startOff, numElements);
}
-
+
public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
private final ArrayTupleBuilder btreeTupleBuilder;
private final ArrayTupleReference btreeTupleReference;
@@ -302,8 +302,8 @@
private final boolean verifyInput;
private final MultiComparator allCmp;
- public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean verifyInput, int startPageId, int fileId)
- throws IndexException, HyracksDataException {
+ public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean verifyInput, long numElementsHint,
+ int startPageId, int fileId) throws IndexException, HyracksDataException {
this.verifyInput = verifyInput;
this.tokenCmp = MultiComparator.create(btree.getComparatorFactories());
this.invListCmp = MultiComparator.create(invListCmpFactories);
@@ -316,7 +316,7 @@
this.btreeTupleReference = new ArrayTupleReference();
this.lastTupleBuilder = new ArrayTupleBuilder(numTokenFields + numInvListKeys);
this.lastTuple = new ArrayTupleReference();
- this.btreeBulkloader = btree.createBulkLoader(btreeFillFactor, verifyInput);
+ this.btreeBulkloader = btree.createBulkLoader(btreeFillFactor, verifyInput, numElementsHint);
currentPageId = startPageId;
currentPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), true);
currentPage.acquireWriteLatch();
@@ -477,7 +477,7 @@
this.index = index;
this.searcher = searcher;
}
-
+
@Override
public IIndexCursor createSearchCursor() {
return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
@@ -563,9 +563,10 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws IndexException {
try {
- return new OnDiskInvertedIndexBulkLoader(fillFactor, verifyInput, rootPageId, fileId);
+ return new OnDiskInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint, rootPageId, fileId);
} catch (HyracksDataException e) {
throw new InvertedIndexException(e);
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 8ce3dc9..1e8766a 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -251,7 +251,7 @@
if (!isEmpty) {
rTreeTupleSorter.sort();
}
- rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false);
+ rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L);
cursor = rTreeTupleSorter;
try {
@@ -274,7 +274,7 @@
BTree diskBTree = component.getBTree();
// BulkLoad the tuples from the in-memory tree into the new disk BTree.
- IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false, 0L);
try {
while (btreeScanCursor.hasNext()) {
btreeScanCursor.next();
@@ -330,7 +330,7 @@
RTree mergedRTree = component.getRTree();
BTree mergedBTree = component.getBTree();
- IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L);
try {
while (cursor.hasNext()) {
cursor.next();
@@ -343,7 +343,7 @@
bulkloader.end();
// Load an empty BTree tree.
- mergedBTree.createBulkLoader(1.0f, false).end();
+ mergedBTree.createBulkLoader(1.0f, false, 0L).end();
return new LSMRTreeImmutableComponent(mergedRTree, mergedBTree);
}
@@ -377,15 +377,17 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput) throws TreeIndexException {
- return new LSMRTreeBulkLoader(fillLevel, verifyInput);
+ public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
+ return new LSMRTreeBulkLoader(fillLevel, verifyInput, numElementsHint);
}
public class LSMRTreeBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final IIndexBulkLoader bulkLoader;
- public LSMRTreeBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+ public LSMRTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
// Note that by using a flush target file name, we state that the
// new bulk loaded tree is "newer" than any other merged tree.
try {
@@ -395,7 +397,8 @@
} catch (IndexException e) {
throw new TreeIndexException(e);
}
- bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput);
+ bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
+ numElementsHint);
}
@Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 6c8fe88..1131ace 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -280,7 +280,7 @@
bTreeTupleSorter.sort();
}
- IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L);
LSMRTreeFlushCursor cursor = new LSMRTreeFlushCursor(rTreeTupleSorter, bTreeTupleSorter, comparatorFields,
linearizerArray);
cursor.open(null, null);
@@ -336,7 +336,7 @@
LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
null, true);
RTree mergedRTree = component.getRTree();
- IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L);
try {
while (cursor.hasNext()) {
cursor.next();
@@ -373,8 +373,9 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput) throws TreeIndexException {
- return new LSMRTreeWithAntiMatterTuplesBulkLoader(fillLevel, verifyInput);
+ public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
+ return new LSMRTreeWithAntiMatterTuplesBulkLoader(fillLevel, verifyInput, numElementsHint);
}
private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
@@ -386,7 +387,8 @@
private final ILSMComponent component;
private final IIndexBulkLoader bulkLoader;
- public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+ public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
// Note that by using a flush target file name, we state that the
// new bulk loaded tree is "newer" than any other merged tree.
try {
@@ -396,7 +398,8 @@
} catch (IndexException e) {
throw new TreeIndexException(e);
}
- bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput);
+ bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
+ numElementsHint);
}
@Override
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index 773d593..c12dc50 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -848,7 +848,8 @@
}
@Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws TreeIndexException {
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+ throws TreeIndexException {
// TODO: verifyInput currently does nothing.
try {
return new RTreeBulkLoader(fillFactor);
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
index 770a2ad..81f7fce 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
@@ -591,7 +591,7 @@
LOGGER.info("Bulk loading " + ins + " tuples");
}
long start = System.currentTimeMillis();
- IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false);
+ IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false, ins);
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
for (int i = 0; i < ins; i++) {
@@ -656,7 +656,7 @@
treeIndex.activate();
// Load sorted records, and expect to fail at tuple i.
- IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, true);
+ IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, true, ins);
for (int j = 0; j < ins; j++) {
if (j > i) {
fail("Bulk load failure test unexpectedly succeeded past tuple: " + j);
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
index 818373c..1a80231 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
@@ -106,7 +106,7 @@
}
public void checkDiskOrderScan(IIndexTestContext ctx) throws Exception {
- try {
+ try {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Testing Disk-Order Scan.");
}
@@ -243,7 +243,7 @@
ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
// Perform bulk load.
- IIndexBulkLoader bulkLoader = ctx.getIndex().createBulkLoader(0.7f, false);
+ IIndexBulkLoader bulkLoader = ctx.getIndex().createBulkLoader(0.7f, false, numTuples);
int c = 1;
for (CheckTuple checkTuple : checkTuples) {
if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
index e911a98..f93e9b6 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
@@ -688,7 +688,7 @@
LOGGER.info("Bulk loading " + numInserts + " tuples");
}
long start = System.currentTimeMillis();
- IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false);
+ IIndexBulkLoader bulkLoader = treeIndex.createBulkLoader(0.7f, false, numInserts);
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
diff --git a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
index e31131b..8856da1 100644
--- a/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
+++ b/hyracks-tests/hyracks-storage-am-bloomfilter-test/src/test/java/edu/uci/ics/hyracks/storage/am/bloomfilter/BloomFilterTest.java
@@ -33,6 +33,7 @@
import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import edu.uci.ics.hyracks.storage.am.bloomfilter.util.AbstractBloomFilterTest;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
@SuppressWarnings("rawtypes")
@@ -45,7 +46,7 @@
}
@Test
- public void basicTest() throws Exception {
+ public void singleFieldTest() throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("TESTING BLOOM FILTER");
}
@@ -59,8 +60,9 @@
BloomFilter bf = new BloomFilter(bufferCache, harness.getFileMapProvider(), harness.getFileReference(),
keyFields);
- bf.create(numElements, numHashes);
+ bf.create();
bf.activate();
+ IIndexBulkLoader builder = bf.createBuilder(numElements, numHashes);
int fieldCount = 2;
ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
@@ -78,28 +80,18 @@
keys.add(i);
}
+ // Insert tuples in the bloom filter
+ for (int i = 0; i < keys.size(); ++i) {
+ TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
+ builder.add(tuple);
+ }
+ builder.end();
+
+ // Check all the inserted tuples can be found.
+
long[] hashes = new long[2];
- // Check against an empty bloom filter
for (int i = 0; i < keys.size(); ++i) {
TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
- Assert.assertFalse(bf.contains(tuple, hashes));
- }
-
- // Check all the inserted tuples can be found
- for (int i = 0; i < keys.size(); ++i) {
- TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
- bf.add(tuple, hashes);
- Assert.assertTrue(bf.contains(tuple, hashes));
- }
-
- // Deactivate the bllom filter
- bf.deactivate();
-
- // Activate the bloom filter and check the tuples again
- bf.activate();
- for (int i = 0; i < keys.size(); ++i) {
- TupleUtils.createIntegerTuple(tupleBuilder, tuple, keys.get(i), i);
- bf.add(tuple, hashes);
Assert.assertTrue(bf.contains(tuple, hashes));
}
@@ -122,8 +114,9 @@
BloomFilter bf = new BloomFilter(bufferCache, harness.getFileMapProvider(), harness.getFileReference(),
keyFields);
- bf.create(numElements, numHashes);
+ bf.create();
bf.activate();
+ IIndexBulkLoader builder = bf.createBuilder(numElements, numHashes);
int fieldCount = 5;
ISerializerDeserializer[] fieldSerdes = { UTF8StringSerializerDeserializer.INSTANCE,
@@ -132,16 +125,27 @@
ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
- long[] hashes = new long[2];
int maxLength = 20;
+ ArrayList<String> s1 = new ArrayList<String>();
+ ArrayList<String> s2 = new ArrayList<String>();
+ ArrayList<String> s3 = new ArrayList<String>();
+ ArrayList<String> s4 = new ArrayList<String>();
for (int i = 0; i < numElements; ++i) {
- String s1 = randomString(rnd.nextInt() % maxLength, rnd);
- String s2 = randomString(rnd.nextInt() % maxLength, rnd);
- String s3 = randomString(rnd.nextInt() % maxLength, rnd);
- String s4 = randomString(rnd.nextInt() % maxLength, rnd);
- TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1, s2, rnd.nextInt(), s3, s4);
+ s1.add(randomString(rnd.nextInt() % maxLength, rnd));
+ s2.add(randomString(rnd.nextInt() % maxLength, rnd));
+ s3.add(randomString(rnd.nextInt() % maxLength, rnd));
+ s4.add(randomString(rnd.nextInt() % maxLength, rnd));
+ }
- bf.add(tuple, hashes);
+ for (int i = 0; i < numElements; ++i) {
+ TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1.get(i), s2.get(i), i, s3.get(i), s4.get(i));
+ builder.add(tuple);
+ }
+ builder.end();
+
+ long[] hashes = new long[2];
+ for (int i = 0; i < numElements; ++i) {
+ TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, s1.get(i), s2.get(i), i, s3.get(i), s4.get(i));
Assert.assertTrue(bf.contains(tuple, hashes));
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index d0d2c77..64f58c3 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -228,7 +228,7 @@
throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
}
- IIndexBulkLoader bulkloader = index.createBulkLoader(1.0f, false);
+ IIndexBulkLoader bulkloader = index.createBulkLoader(1.0f, false, end - begin);
for (int i = begin; i <= end; i++) {
TupleUtils.createIntegerTuple(builder, tuple, i);
bulkloader.add(tuple);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java
index 2e9395f..69e2b58 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/BTreeBulkLoadRunner.java
@@ -37,7 +37,7 @@
public long runExperiment(DataGenThread dataGen, int numThreads) throws Exception {
btree.create();
long start = System.currentTimeMillis();
- IIndexBulkLoader bulkLoader = btree.createBulkLoader(1.0f, false);
+ IIndexBulkLoader bulkLoader = btree.createBulkLoader(1.0f, false, 0L);
for (int i = 0; i < numBatches; i++) {
TupleBatch batch = dataGen.tupleBatchQueue.take();
for (int j = 0; j < batch.size(); j++) {
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index 7b54884..97f78f3 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -205,7 +205,7 @@
ISerializerDeserializer[] fieldSerdes = testCtx.getFieldSerdes();
// Use the expected index to bulk-load the actual index.
- IIndexBulkLoader bulkLoader = testCtx.getIndex().createBulkLoader(1.0f, false);
+ IIndexBulkLoader bulkLoader = testCtx.getIndex().createBulkLoader(1.0f, false, numDocs);
ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(testCtx.getFieldSerdes().length);
ArrayTupleReference tuple = new ArrayTupleReference();
Iterator<CheckTuple> checkTupleIter = tmpMemIndex.iterator();