Add LSMDiskComponentBulkLoader
-Added LSMDiskComponentBulkLoader implementations, which are used to bulk
load an LSMDiskComponent with anti-matters
-Added LSMDiskComponentWithBuddyBTreeBulkLoader implementations,
which are used to bulk load an LSMDiskComponent with deleted-keys btrees
-Refactored LSM flush/merge/index bulk load operations to use
the LSMDiskComponentBulkLoader
Change-Id: I772a6d68761fcbb85982a1c9f72f2d186e1d1ffb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1773
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index 1d8de79..c641dc1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -28,12 +28,9 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
@@ -451,14 +448,11 @@
// modifications
public class LSMTwoPCBTreeBulkLoader implements IIndexBulkLoader, ITwoPCIndexBulkLoader {
private final ILSMDiskComponent component;
- private final BTreeBulkLoader bulkLoader;
- private final IIndexBulkLoader builder;
- private boolean cleanedUpArtifacts = false;
- private boolean isEmptyComponent = true;
- private boolean endedBloomFilterLoad = false;
- private final boolean isTransaction;
+ private final IIndexBulkLoader componentBulkLoader;
private final ITreeIndexTupleWriterFactory frameTupleWriterFactory;
+ private final boolean isTransaction;
+
public LSMTwoPCBTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
boolean isTransaction) throws HyracksDataException {
this.isTransaction = isTransaction;
@@ -471,68 +465,23 @@
frameTupleWriterFactory =
((LSMBTreeDiskComponent) component).getBTree().getLeafFrameFactory().getTupleWriterFactory();
- bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
- verifyInput, numElementsHint, false);
- int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
- builder = ((LSMBTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
- bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+ componentBulkLoader =
+ createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
}
// It is expected that the mode was set to insert operation before
// calling add
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- bulkLoader.add(tuple);
- builder.add(tuple);
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
- }
-
- // This is made public in case of a failure, it is better to delete all
- // created artifacts.
- public void cleanupArtifacts() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- cleanedUpArtifacts = true;
- // We make sure to end the bloom filter load to release latches.
- if (!endedBloomFilterLoad) {
- builder.end();
- endedBloomFilterLoad = true;
- }
- try {
- ((LSMBTreeDiskComponent) component).getBTree().deactivate();
- } catch (HyracksDataException e) {
- // Do nothing.. this could've bee
- }
- ((LSMBTreeDiskComponent) component).getBTree().destroy();
- try {
- ((LSMBTreeDiskComponent) component).getBloomFilter().deactivate();
- } catch (HyracksDataException e) {
- // Do nothing.. this could've bee
- }
- ((LSMBTreeDiskComponent) component).getBloomFilter().destroy();
- }
+ componentBulkLoader.add(tuple);
}
@Override
public void end() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- if (!endedBloomFilterLoad) {
- builder.end();
- endedBloomFilterLoad = true;
- }
- bulkLoader.end();
- if (isEmptyComponent) {
- cleanupArtifacts();
- } else if (isTransaction) {
+ componentBulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ if (isTransaction) {
// Since this is a transaction component, validate and
// deactivate. it could later be added or deleted
markAsValid(component);
@@ -551,23 +500,14 @@
@Override
public void delete(ITupleReference tuple) throws HyracksDataException {
((LSMBTreeRefrencingTupleWriterFactory) frameTupleWriterFactory).setMode(IndexOperation.DELETE);
- try {
- bulkLoader.add(tuple);
- builder.add(tuple);
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
+ componentBulkLoader.add(tuple);
((LSMBTreeRefrencingTupleWriterFactory) frameTupleWriterFactory).setMode(IndexOperation.INSERT);
}
@Override
public void abort() {
try {
- cleanupArtifacts();
+ componentBulkLoader.abort();
} catch (Exception e) {
// Do nothing
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index fed4588..dfa08d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -35,7 +35,6 @@
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -70,6 +69,7 @@
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
@@ -274,6 +274,26 @@
newerList.add(swapIndex, newComponent);
}
+ @Override
+ public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor,
+ boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+ throws HyracksDataException {
+ BloomFilterSpecification bloomFilterSpec = null;
+ if (numElementsHint > 0) {
+ int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+ bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+ }
+ if (withFilter && filterFields != null) {
+ return new LSMBTreeWithBuddyDiskComponentBulkLoader((LSMBTreeWithBuddyDiskComponent) component,
+ bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+ treeFields, filterFields,
+ MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+ } else {
+ return new LSMBTreeWithBuddyDiskComponentBulkLoader((LSMBTreeWithBuddyDiskComponent) component,
+ bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ }
+ }
+
// For initial load
@Override
public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
@@ -372,6 +392,8 @@
LSMBTreeWithBuddyDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(),
mergeOp.getBuddyBTreeTarget(), mergeOp.getBloomFilterTarget(), true);
+ IIndexBulkLoader componentBulkLoader;
+
// In case we must keep the deleted-keys BuddyBTrees, then they must be
// merged *before* merging the b-trees so that
// lsmHarness.endSearch() is called once when the b-trees have been
@@ -383,46 +405,37 @@
LSMBuddyBTreeMergeCursor buddyBtreeCursor = new LSMBuddyBTreeMergeCursor(opCtx);
search(opCtx, buddyBtreeCursor, btreeSearchPred);
- BTree buddyBtree = mergedComponent.getBuddyBTree();
- IIndexBulkLoader buddyBtreeBulkLoader = buddyBtree.createBulkLoader(1.0f, true, 0L, false);
-
long numElements = 0L;
for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
numElements += ((LSMBTreeWithBuddyDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter()
.getNumElements();
}
- int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
- IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
- bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+ componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, numElements, false, false);
try {
while (buddyBtreeCursor.hasNext()) {
buddyBtreeCursor.next();
ITupleReference tuple = buddyBtreeCursor.getTuple();
- buddyBtreeBulkLoader.add(tuple);
- builder.add(tuple);
+ ((LSMBTreeWithBuddyDiskComponentBulkLoader) componentBulkLoader).delete(tuple);
}
} finally {
buddyBtreeCursor.close();
- builder.end();
}
- buddyBtreeBulkLoader.end();
+ } else {
+ componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, 0L, false, false);
}
- IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, 0L, false);
try {
while (cursor.hasNext()) {
cursor.next();
ITupleReference frameTuple = cursor.getTuple();
- bulkLoader.add(frameTuple);
+ componentBulkLoader.add(frameTuple);
}
} finally {
cursor.close();
}
- bulkLoader.end();
+ componentBulkLoader.end();
return mergedComponent;
}
@@ -589,12 +602,7 @@
// modifications
public class LSMTwoPCBTreeWithBuddyBulkLoader implements IIndexBulkLoader, ITwoPCIndexBulkLoader {
private final ILSMDiskComponent component;
- private final BTreeBulkLoader btreeBulkLoader;
- private final BTreeBulkLoader buddyBtreeBulkLoader;
- private final IIndexBulkLoader builder;
- private boolean cleanedUpArtifacts = false;
- private boolean isEmptyComponent = true;
- private boolean endedBloomFilterLoad = false;
+ private final IIndexBulkLoader componentBulkLoader;
private final boolean isTransaction;
public LSMTwoPCBTreeWithBuddyBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
@@ -607,69 +615,20 @@
component = createBulkLoadTarget();
}
- // Create the three loaders
- btreeBulkLoader = (BTreeBulkLoader) ((LSMBTreeWithBuddyDiskComponent) component).getBTree()
- .createBulkLoader(fillFactor, verifyInput, numElementsHint, false);
- buddyBtreeBulkLoader = (BTreeBulkLoader) ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree()
- .createBulkLoader(fillFactor, verifyInput, numElementsHint, false);
- int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
- builder = ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
- bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+ componentBulkLoader =
+ createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
}
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- btreeBulkLoader.add(tuple);
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
- }
-
- // This is made public in case of a failure, it is better to delete all
- // created artifacts.
- public void cleanupArtifacts() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- cleanedUpArtifacts = true;
- try {
- ((LSMBTreeWithBuddyDiskComponent) component).getBTree().deactivate();
- } catch (Exception e) {
-
- }
- ((LSMBTreeWithBuddyDiskComponent) component).getBTree().destroy();
- try {
- ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree().deactivate();
- } catch (Exception e) {
-
- }
- ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree().destroy();
- try {
- ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter().deactivate();
- } catch (Exception e) {
-
- }
- ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter().destroy();
- }
+ componentBulkLoader.add(tuple);
}
@Override
public void end() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- if (!endedBloomFilterLoad) {
- builder.end();
- endedBloomFilterLoad = true;
- }
- btreeBulkLoader.end();
- buddyBtreeBulkLoader.end();
- if (isEmptyComponent) {
- cleanupArtifacts();
- } else if (isTransaction) {
+ componentBulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ if (isTransaction) {
// Since this is a transaction component, validate and
// deactivate. it could later be added or deleted
markAsValid(component);
@@ -687,22 +646,13 @@
@Override
public void delete(ITupleReference tuple) throws HyracksDataException {
- try {
- buddyBtreeBulkLoader.add(tuple);
- builder.add(tuple);
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
+ ((LSMBTreeWithBuddyDiskComponentBulkLoader) componentBulkLoader).delete(tuple);
}
@Override
public void abort() {
try {
- cleanupArtifacts();
+ componentBulkLoader.abort();
} catch (Exception e) {
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 4cd4dc6..a4c67c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -73,6 +73,7 @@
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
@@ -292,12 +293,10 @@
RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
long numElements = 0L;
- BloomFilterSpecification bloomFilterSpec = null;
if (hasBloomFilter) {
//count elements in btree for creating Bloomfilter
IIndexCursor countingCursor = ((BTreeAccessor) accessor).createCountingSearchCursor();
accessor.search(countingCursor, nullPred);
-
try {
while (countingCursor.hasNext()) {
countingCursor.next();
@@ -307,35 +306,23 @@
} finally {
countingCursor.close();
}
-
- int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
- bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
}
LSMBTreeDiskComponent component =
createDiskComponent(componentFactory, flushOp.getTarget(), flushOp.getBloomFilterTarget(), true);
- IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false, numElements, false);
- IIndexBulkLoader builder = null;
- if (hasBloomFilter) {
- builder = component.getBloomFilter().createBuilder(numElements, bloomFilterSpec.getNumHashes(),
- bloomFilterSpec.getNumBucketsPerElements());
- }
+
+ IIndexBulkLoader componentBulkLoader =
+ createComponentBulkLoader(component, 1.0f, false, numElements, false, false);
IIndexCursor scanCursor = accessor.createSearchCursor(false);
accessor.search(scanCursor, nullPred);
try {
while (scanCursor.hasNext()) {
scanCursor.next();
- if (hasBloomFilter) {
- builder.add(scanCursor.getTuple());
- }
- bulkLoader.add(scanCursor.getTuple());
+ componentBulkLoader.add(scanCursor.getTuple());
}
} finally {
scanCursor.close();
- if (hasBloomFilter) {
- builder.end();
- }
}
if (component.getLSMComponentFilter() != null) {
@@ -353,7 +340,8 @@
// TODO This code should be in the callback and not in the index
flushingComponent.getMetadata().copy(component.getMetadata());
- bulkLoader.end();
+ componentBulkLoader.end();
+
return component;
}
@@ -368,38 +356,25 @@
List<ILSMComponent> mergedComponents = mergeOp.getMergingComponents();
long numElements = 0L;
- BloomFilterSpecification bloomFilterSpec = null;
if (hasBloomFilter) {
//count elements in btree for creating Bloomfilter
for (int i = 0; i < mergedComponents.size(); ++i) {
numElements += ((LSMBTreeDiskComponent) mergedComponents.get(i)).getBloomFilter().getNumElements();
}
- int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
- bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
}
LSMBTreeDiskComponent mergedComponent =
createDiskComponent(componentFactory, mergeOp.getTarget(), mergeOp.getBloomFilterTarget(), true);
- IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false);
- IIndexBulkLoader builder = null;
- if (hasBloomFilter) {
- builder = mergedComponent.getBloomFilter().createBuilder(numElements, bloomFilterSpec.getNumHashes(),
- bloomFilterSpec.getNumBucketsPerElements());
- }
+ IIndexBulkLoader componentBulkLoader =
+ createComponentBulkLoader(mergedComponent, 1.0f, false, numElements, false, false);
try {
while (cursor.hasNext()) {
cursor.next();
ITupleReference frameTuple = cursor.getTuple();
- if (hasBloomFilter) {
- builder.add(frameTuple);
- }
- bulkLoader.add(frameTuple);
+ componentBulkLoader.add(frameTuple);
}
} finally {
cursor.close();
- if (hasBloomFilter) {
- builder.end();
- }
}
if (mergedComponent.getLSMComponentFilter() != null) {
List<ITupleReference> filterTuples = new ArrayList<>();
@@ -410,7 +385,9 @@
getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getBTree());
}
- bulkLoader.end();
+
+ componentBulkLoader.end();
+
return mergedComponent;
}
@@ -438,6 +415,27 @@
}
@Override
+ public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor,
+ boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+ throws HyracksDataException {
+ BloomFilterSpecification bloomFilterSpec = null;
+ if (hasBloomFilter) {
+ int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+ bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+ }
+
+ if (withFilter && filterFields != null) {
+ return new LSMBTreeDiskComponentBulkLoader((LSMBTreeDiskComponent) component, bloomFilterSpec, fillFactor,
+ verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields, filterFields,
+ MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+ } else {
+ return new LSMBTreeDiskComponentBulkLoader((LSMBTreeDiskComponent) component, bloomFilterSpec, fillFactor,
+ verifyInput, numElementsHint, checkIfEmptyIndex);
+ }
+
+ }
+
+ @Override
public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
throws HyracksDataException {
return new LSMBTreeBulkLoader(this, fillLevel, verifyInput, numElementsHint);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java
index 247b3de..239eba2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java
@@ -20,135 +20,43 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.MultiComparator;
public class LSMBTreeBulkLoader implements IIndexBulkLoader {
private final LSMBTree lsmIndex;
private final ILSMDiskComponent component;
- private final BTreeBulkLoader bulkLoader;
- private final IIndexBulkLoader builder;
- private boolean cleanedUpArtifacts = false;
- private boolean isEmptyComponent = true;
- private boolean endedBloomFilterLoad = false;
- public final PermutingTupleReference indexTuple;
- public final PermutingTupleReference filterTuple;
- public final MultiComparator filterCmp;
+ private final IIndexBulkLoader componentBulkLoader;
public LSMBTreeBulkLoader(LSMBTree lsmIndex, float fillFactor, boolean verifyInput, long numElementsHint)
throws HyracksDataException {
this.lsmIndex = lsmIndex;
- component = lsmIndex.createBulkLoadTarget();
- bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
- verifyInput, numElementsHint, false);
-
- if (lsmIndex.hasBloomFilter()) {
- int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement, lsmIndex.bloomFilterFalsePositiveRate());
- builder = ((LSMBTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
- bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
- } else {
- builder = null;
- }
-
- if (lsmIndex.getFilterFields() != null) {
- indexTuple = new PermutingTupleReference(lsmIndex.getTreeFields());
- filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
- filterTuple = new PermutingTupleReference(lsmIndex.getFilterFields());
- } else {
- indexTuple = null;
- filterCmp = null;
- filterTuple = null;
- }
+ this.component = lsmIndex.createBulkLoadTarget();
+ this.componentBulkLoader =
+ lsmIndex.createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
}
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- ITupleReference t;
- if (indexTuple != null) {
- indexTuple.reset(tuple);
- t = indexTuple;
- } else {
- t = tuple;
- }
-
- bulkLoader.add(t);
- if (lsmIndex.hasBloomFilter()) {
- builder.add(t);
- }
-
- if (filterTuple != null) {
- filterTuple.reset(tuple);
- component.getLSMComponentFilter().update(filterTuple, filterCmp);
- }
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
- }
-
- private void cleanupArtifacts() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- cleanedUpArtifacts = true;
- if (lsmIndex.hasBloomFilter() && !endedBloomFilterLoad) {
- builder.abort();
- endedBloomFilterLoad = true;
- }
- ((LSMBTreeDiskComponent) component).getBTree().deactivate();
- ((LSMBTreeDiskComponent) component).getBTree().destroy();
- if (lsmIndex.hasBloomFilter()) {
- ((LSMBTreeDiskComponent) component).getBloomFilter().deactivate();
- ((LSMBTreeDiskComponent) component).getBloomFilter().destroy();
- }
- }
+ componentBulkLoader.add(tuple);
}
@Override
public void end() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- if (lsmIndex.hasBloomFilter() && !endedBloomFilterLoad) {
- builder.end();
- endedBloomFilterLoad = true;
- }
-
- if (component.getLSMComponentFilter() != null) {
- lsmIndex.getFilterManager().writeFilter(component.getLSMComponentFilter(),
- ((LSMBTreeDiskComponent) component).getBTree());
- }
- bulkLoader.end();
-
- if (isEmptyComponent) {
- cleanupArtifacts();
- } else {
- //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
- //then after operation should be called from harness as well
- //https://issues.apache.org/jira/browse/ASTERIXDB-1764
- lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component);
- lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
- }
+ componentBulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
+ //then after operation should be called from harness as well
+ //https://issues.apache.org/jira/browse/ASTERIXDB-1764
+ lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component);
+ lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
}
}
@Override
public void abort() throws HyracksDataException {
- if (bulkLoader != null) {
- bulkLoader.abort();
- }
-
- if (builder != null) {
- builder.abort();
- }
-
+ componentBulkLoader.end();
}
+
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java
new file mode 100644
index 0000000..a5720de
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMBTreeDiskComponentBulkLoader extends AbstractLSMDiskComponentBulkLoader {
+
+ //with filter
+ public LSMBTreeDiskComponentBulkLoader(LSMBTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+ float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+ ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp)
+ throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+ indexFields, filterFields, filterCmp);
+ }
+
+ //without filter
+ public LSMBTreeDiskComponentBulkLoader(LSMBTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+ float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex)
+ throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+ null);
+ }
+
+ @Override
+ protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+ return ((LSMBTreeDiskComponent) component).getBloomFilter();
+ }
+
+ @Override
+ protected IIndex getIndex(ILSMDiskComponent component) {
+ return ((LSMBTreeDiskComponent) component).getBTree();
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponentBulkLoader.java
new file mode 100644
index 0000000..e4ab6d4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponentBulkLoader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMBTreeWithBuddyDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBulkLoader {
+
+ //with filter
+ public LSMBTreeWithBuddyDiskComponentBulkLoader(LSMBTreeWithBuddyDiskComponent component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+ MultiComparator filterCmp) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+ indexFields, filterFields, filterCmp);
+ }
+
+ //without filter
+ public LSMBTreeWithBuddyDiskComponentBulkLoader(LSMBTreeWithBuddyDiskComponent component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+ null);
+ }
+
+ @Override
+ protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+ return ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter();
+ }
+
+ @Override
+ protected IIndex getIndex(ILSMDiskComponent component) {
+ return ((LSMBTreeWithBuddyDiskComponent) component).getBTree();
+ }
+
+ @Override
+ protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) {
+ return ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree();
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 2a9186b..5a6f391 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMHarness;
import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
@@ -139,4 +140,20 @@
* @throws HyracksDataException
*/
void updateFilter(ILSMIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException;
+
+ /**
+ * Create a component bulk loader for the given component
+ *
+ * @param component
+ * @param fillFactor
+ * @param verifyInput
+ * @param numElementsHint
+ * @param checkIfEmptyIndex
+ * @param withFilter
+ * @return
+ * @throws HyracksDataException
+ */
+ IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor, boolean verifyInput,
+ long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter) throws HyracksDataException;
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
new file mode 100644
index 0000000..964893a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public abstract class AbstractLSMDiskComponentBulkLoader implements IIndexBulkLoader {
+ protected final ILSMDiskComponent component;
+
+ protected final IIndexBulkLoader indexBulkLoader;
+ protected final IIndexBulkLoader bloomFilterBuilder;
+
+ protected final ILSMComponentFilterManager filterManager;
+ protected final PermutingTupleReference indexTuple;
+ protected final PermutingTupleReference filterTuple;
+ protected final MultiComparator filterCmp;
+
+ protected boolean cleanedUpArtifacts = false;
+ protected boolean isEmptyComponent = true;
+ protected boolean endedBloomFilterLoad = false;
+
+ //with filter
+ public AbstractLSMDiskComponentBulkLoader(ILSMDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+ float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+ ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp)
+ throws HyracksDataException {
+ this.component = component;
+ this.indexBulkLoader =
+ getIndex(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ if (bloomFilterSpec != null) {
+ this.bloomFilterBuilder = getBloomFilter(component).createBuilder(numElementsHint,
+ bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+ } else {
+ this.bloomFilterBuilder = null;
+ }
+ if (filterManager != null) {
+ this.filterManager = filterManager;
+ this.indexTuple = new PermutingTupleReference(indexFields);
+ this.filterTuple = new PermutingTupleReference(filterFields);
+ this.filterCmp = filterCmp;
+ } else {
+ this.filterManager = null;
+ this.indexTuple = null;
+ this.filterTuple = null;
+ this.filterCmp = null;
+ }
+ }
+
+ @Override
+ public void add(ITupleReference tuple) throws HyracksDataException {
+ try {
+ ITupleReference t;
+ if (indexTuple != null) {
+ indexTuple.reset(tuple);
+ t = indexTuple;
+ } else {
+ t = tuple;
+ }
+
+ indexBulkLoader.add(t);
+ if (bloomFilterBuilder != null) {
+ bloomFilterBuilder.add(t);
+ }
+
+ if (filterTuple != null) {
+ filterTuple.reset(tuple);
+ component.getLSMComponentFilter().update(filterTuple, filterCmp);
+ }
+ } catch (Exception e) {
+ cleanupArtifacts();
+ throw e;
+ }
+ if (isEmptyComponent) {
+ isEmptyComponent = false;
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ if (indexBulkLoader != null) {
+ indexBulkLoader.abort();
+ }
+ if (bloomFilterBuilder != null) {
+ bloomFilterBuilder.abort();
+ }
+
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ if (!cleanedUpArtifacts) {
+ if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+ bloomFilterBuilder.end();
+ endedBloomFilterLoad = true;
+ }
+
+ //use filter
+ if (filterManager != null && component.getLSMComponentFilter() != null) {
+ filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component));
+ }
+ indexBulkLoader.end();
+
+ if (isEmptyComponent) {
+ cleanupArtifacts();
+ }
+ }
+ }
+
+ protected void cleanupArtifacts() throws HyracksDataException {
+ if (!cleanedUpArtifacts) {
+ cleanedUpArtifacts = true;
+ if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+ bloomFilterBuilder.abort();
+ endedBloomFilterLoad = true;
+ }
+ getIndex(component).deactivate();
+ getIndex(component).destroy();
+ if (bloomFilterBuilder != null) {
+ getBloomFilter(component).deactivate();
+ getBloomFilter(component).destroy();
+ }
+ }
+ }
+
+ /**
+ * TreeIndex is used to hold the filter tuple values
+ *
+ * @param component
+ * @return
+ */
+ protected ITreeIndex getTreeIndex(ILSMDiskComponent component) {
+ return (ITreeIndex) getIndex(component);
+ }
+
+ protected abstract IIndex getIndex(ILSMDiskComponent component);
+
+ protected abstract BloomFilter getBloomFilter(ILSMDiskComponent component);
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java
new file mode 100644
index 0000000..453d6cf
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public abstract class AbstractLSMDiskComponentWithBuddyBulkLoader extends AbstractLSMDiskComponentBulkLoader {
+
+ protected final IIndexBulkLoader buddyBTreeBulkLoader;
+
+ //with filter
+ public AbstractLSMDiskComponentWithBuddyBulkLoader(ILSMDiskComponent component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+ MultiComparator filterCmp) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+ indexFields, filterFields, filterCmp);
+
+ // BuddyBTree must be created even if it could be empty,
+ // since without it the component is not considered as valid.
+ buddyBTreeBulkLoader =
+ getBuddyBTree(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ }
+
+ @Override
+ public void add(ITupleReference tuple) throws HyracksDataException {
+ try {
+ ITupleReference t;
+ if (indexTuple != null) {
+ indexTuple.reset(tuple);
+ t = indexTuple;
+ } else {
+ t = tuple;
+ }
+
+ indexBulkLoader.add(t);
+
+ if (filterTuple != null) {
+ filterTuple.reset(tuple);
+ component.getLSMComponentFilter().update(filterTuple, filterCmp);
+ }
+ } catch (Exception e) {
+ cleanupArtifacts();
+ throw e;
+ }
+ if (isEmptyComponent) {
+ isEmptyComponent = false;
+ }
+ }
+
+ public void delete(ITupleReference tuple) throws HyracksDataException {
+ try {
+ buddyBTreeBulkLoader.add(tuple);
+ if (bloomFilterBuilder != null) {
+ bloomFilterBuilder.add(tuple);
+ }
+ } catch (HyracksDataException e) {
+ //deleting a key multiple times is OK
+ if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) {
+ cleanupArtifacts();
+ throw e;
+ }
+ } catch (Exception e) {
+ cleanupArtifacts();
+ throw e;
+ }
+ if (isEmptyComponent) {
+ isEmptyComponent = false;
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ super.abort();
+ if (buddyBTreeBulkLoader != null) {
+ buddyBTreeBulkLoader.abort();
+ }
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ if (!cleanedUpArtifacts) {
+ if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+ bloomFilterBuilder.end();
+ endedBloomFilterLoad = true;
+ }
+
+ //use filter
+ if (filterManager != null && component.getLSMComponentFilter() != null) {
+ filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component));
+ }
+ indexBulkLoader.end();
+ buddyBTreeBulkLoader.end();
+
+ if (isEmptyComponent) {
+ cleanupArtifacts();
+ }
+ }
+ }
+
+ @Override
+ protected void cleanupArtifacts() throws HyracksDataException {
+ if (!cleanedUpArtifacts) {
+ cleanedUpArtifacts = true;
+ if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+ bloomFilterBuilder.abort();
+ endedBloomFilterLoad = true;
+ }
+ getIndex(component).deactivate();
+ getIndex(component).destroy();
+
+ getBuddyBTree(component).deactivate();
+ getBuddyBTree(component).destroy();
+
+ if (bloomFilterBuilder != null) {
+ getBloomFilter(component).deactivate();
+ getBloomFilter(component).destroy();
+ }
+ }
+ }
+
+ protected abstract ITreeIndex getBuddyBTree(ILSMDiskComponent component);
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 2363c43..f827b21 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -339,39 +339,14 @@
// Create an inverted index instance to be bulk loaded.
LSMInvertedIndexDiskComponent component = createDiskInvIndexComponent(componentFactory, flushOp.getTarget(),
flushOp.getDeletedKeysBTreeTarget(), flushOp.getBloomFilterTarget(), true);
- IInvertedIndex diskInvertedIndex = component.getInvIndex();
// Create a scan cursor on the BTree underlying the in-memory inverted index.
LSMInvertedIndexMemoryComponent flushingComponent =
(LSMInvertedIndexMemoryComponent) flushOp.getFlushingComponent();
- InMemoryInvertedIndexAccessor memInvIndexAccessor = (InMemoryInvertedIndexAccessor) flushingComponent
- .getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- BTreeAccessor memBTreeAccessor = memInvIndexAccessor.getBTreeAccessor();
+
RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
- IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false);
- memBTreeAccessor.search(scanCursor, nullPred);
- // Bulk load the disk inverted index from the in-memory inverted index.
- IIndexBulkLoader invIndexBulkLoader = diskInvertedIndex.createBulkLoader(1.0f, false, 0L, false);
- try {
- while (scanCursor.hasNext()) {
- scanCursor.next();
- invIndexBulkLoader.add(scanCursor.getTuple());
- }
- } finally {
- scanCursor.close();
- }
- if (component.getLSMComponentFilter() != null) {
- List<ITupleReference> filterTuples = new ArrayList<>();
- filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
- filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
- getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples);
- getFilterManager().writeFilter(component.getLSMComponentFilter(),
- ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
- }
- flushingComponent.getMetadata().copy(component.getMetadata());
- invIndexBulkLoader.end();
-
+ // Search the deleted keys BTree to calculate the number of elements for BloomFilter
IIndexAccessor deletedKeysBTreeAccessor = flushingComponent.getDeletedKeysBTree()
.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
IIndexCursor btreeCountingCursor = ((BTreeAccessor) deletedKeysBTreeAccessor).createCountingSearchCursor();
@@ -387,33 +362,50 @@
btreeCountingCursor.close();
}
- int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numBTreeTuples);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
-
- // Create an BTree instance for the deleted keys.
- BTree diskDeletedKeysBTree = component.getDeletedKeysBTree();
+ IIndexBulkLoader componentBulkLoader =
+ createComponentBulkLoader(component, 1.0f, false, numBTreeTuples, false, false);
// Create a scan cursor on the deleted keys BTree underlying the in-memory inverted index.
IIndexCursor deletedKeysScanCursor = deletedKeysBTreeAccessor.createSearchCursor(false);
deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred);
- // Bulk load the deleted-keys BTree.
- IIndexBulkLoader deletedKeysBTreeBulkLoader = diskDeletedKeysBTree.createBulkLoader(1.0f, false, 0L, false);
- IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numBTreeTuples,
- bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-
try {
while (deletedKeysScanCursor.hasNext()) {
deletedKeysScanCursor.next();
- deletedKeysBTreeBulkLoader.add(deletedKeysScanCursor.getTuple());
- builder.add(deletedKeysScanCursor.getTuple());
+ ((LSMInvertedIndexDiskComponentBulkLoader) componentBulkLoader)
+ .delete(deletedKeysScanCursor.getTuple());
}
} finally {
deletedKeysScanCursor.close();
- builder.end();
}
- deletedKeysBTreeBulkLoader.end();
+
+ // Scan the in-memory inverted index
+ InMemoryInvertedIndexAccessor memInvIndexAccessor = (InMemoryInvertedIndexAccessor) flushingComponent
+ .getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ BTreeAccessor memBTreeAccessor = memInvIndexAccessor.getBTreeAccessor();
+ IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false);
+ memBTreeAccessor.search(scanCursor, nullPred);
+
+ // Bulk load the disk inverted index from the in-memory inverted index.
+ try {
+ while (scanCursor.hasNext()) {
+ scanCursor.next();
+ componentBulkLoader.add(scanCursor.getTuple());
+ }
+ } finally {
+ scanCursor.close();
+ }
+ if (component.getLSMComponentFilter() != null) {
+ List<ITupleReference> filterTuples = new ArrayList<>();
+ filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
+ filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
+ filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples);
+ filterManager.writeFilter(component.getLSMComponentFilter(),
+ ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
+ }
+ flushingComponent.getMetadata().copy(component.getMetadata());
+
+ componentBulkLoader.end();
return component;
}
@@ -433,7 +425,7 @@
LSMInvertedIndexDiskComponent component = createDiskInvIndexComponent(componentFactory, mergeOp.getTarget(),
mergeOp.getDeletedKeysBTreeTarget(), mergeOp.getBloomFilterTarget(), true);
- IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
+ IIndexBulkLoader componentBulkLoader;
// In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the inverted indexes so that
// lsmHarness.endSearch() is called once when the inverted indexes have been merged.
@@ -445,44 +437,31 @@
new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx);
search(opCtx, btreeCursor, mergePred);
- BTree btree = component.getDeletedKeysBTree();
- IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
-
long numElements = 0L;
for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
numElements += ((LSMInvertedIndexDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter()
.getNumElements();
}
- int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
- IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numElements,
- bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+ componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, numElements, false, false);
try {
while (btreeCursor.hasNext()) {
btreeCursor.next();
ITupleReference tuple = btreeCursor.getTuple();
- btreeBulkLoader.add(tuple);
- builder.add(tuple);
+ ((LSMInvertedIndexDiskComponentBulkLoader) componentBulkLoader).delete(tuple);
}
} finally {
btreeCursor.close();
- builder.end();
}
- btreeBulkLoader.end();
} else {
- BTree btree = component.getDeletedKeysBTree();
- IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
- btreeBulkLoader.end();
+ componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, 0L, false, false);
}
- IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L, false);
try {
while (cursor.hasNext()) {
cursor.next();
ITupleReference tuple = cursor.getTuple();
- invIndexBulkLoader.add(tuple);
+ componentBulkLoader.add(tuple);
}
} finally {
cursor.close();
@@ -503,12 +482,33 @@
getFilterManager().writeFilter(component.getLSMComponentFilter(),
((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
}
- invIndexBulkLoader.end();
+
+ componentBulkLoader.end();
return component;
}
@Override
+ public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor,
+ boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+ throws HyracksDataException {
+ BloomFilterSpecification bloomFilterSpec = null;
+ if (numElementsHint > 0) {
+ int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+ bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+ }
+ if (withFilter && filterFields != null) {
+ return new LSMInvertedIndexDiskComponentBulkLoader((LSMInvertedIndexDiskComponent) component,
+ bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+ treeFields, filterFields,
+ MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+ } else {
+ return new LSMInvertedIndexDiskComponentBulkLoader((LSMInvertedIndexDiskComponent) component,
+ bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ }
+ }
+
+ @Override
public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
throws HyracksDataException {
return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint);
@@ -516,105 +516,35 @@
public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
private final ILSMDiskComponent component;
- private final IIndexBulkLoader invIndexBulkLoader;
- private final IIndexBulkLoader deletedKeysBTreeBulkLoader;
- private boolean cleanedUpArtifacts = false;
- private boolean isEmptyComponent = true;
- public final PermutingTupleReference indexTuple;
- public final PermutingTupleReference filterTuple;
- public final MultiComparator filterCmp;
+ private final IIndexBulkLoader componentBulkLoader;
public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
throws HyracksDataException {
// Note that by using a flush target file name, we state that the
// new bulk loaded tree is "newer" than any other merged tree.
component = createBulkLoadTarget();
- invIndexBulkLoader = ((LSMInvertedIndexDiskComponent) component).getInvIndex().createBulkLoader(fillFactor,
- verifyInput, numElementsHint, false);
- //validity of the component depends on the deleted keys file being there even if it's empty.
- deletedKeysBTreeBulkLoader = ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree()
- .createBulkLoader(fillFactor, verifyInput, numElementsHint, false);
-
- if (getFilterFields() != null) {
- indexTuple = new PermutingTupleReference(getTreeFields());
- filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
- filterTuple = new PermutingTupleReference(getFilterFields());
- } else {
- indexTuple = null;
- filterCmp = null;
- filterTuple = null;
- }
+ componentBulkLoader =
+ createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
}
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- ITupleReference t;
- if (indexTuple != null) {
- indexTuple.reset(tuple);
- t = indexTuple;
- } else {
- t = tuple;
- }
-
- invIndexBulkLoader.add(t);
-
- if (filterTuple != null) {
- filterTuple.reset(tuple);
- component.getLSMComponentFilter().update(filterTuple, filterCmp);
- }
-
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
- }
-
- protected void cleanupArtifacts() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- cleanedUpArtifacts = true;
- ((LSMInvertedIndexDiskComponent) component).getInvIndex().deactivate();
- ((LSMInvertedIndexDiskComponent) component).getInvIndex().destroy();
- ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree().deactivate();
- ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree().destroy();
- ((LSMInvertedIndexDiskComponent) component).getBloomFilter().deactivate();
- ((LSMInvertedIndexDiskComponent) component).getBloomFilter().destroy();
- }
+ componentBulkLoader.add(tuple);
}
@Override
public void end() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- if (component.getLSMComponentFilter() != null) {
- getFilterManager().writeFilter(component.getLSMComponentFilter(),
- ((OnDiskInvertedIndex) ((LSMInvertedIndexDiskComponent) component).getInvIndex())
- .getBTree());
- }
- invIndexBulkLoader.end();
- deletedKeysBTreeBulkLoader.end();
-
- if (isEmptyComponent) {
- cleanupArtifacts();
- } else {
- ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
- getLsmHarness().addBulkLoadedComponent(component);
- }
+ componentBulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
+ lsmHarness.addBulkLoadedComponent(component);
}
}
@Override
public void abort() throws HyracksDataException {
- if (invIndexBulkLoader != null) {
- invIndexBulkLoader.abort();
- }
-
- if (deletedKeysBTreeBulkLoader != null) {
- deletedKeysBTreeBulkLoader.abort();
- }
+ componentBulkLoader.abort();
}
private ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java
new file mode 100644
index 0000000..38e1c42
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBulkLoader;
+import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMInvertedIndexDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBulkLoader {
+
+ //with filter
+ public LSMInvertedIndexDiskComponentBulkLoader(LSMInvertedIndexDiskComponent component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+ MultiComparator filterCmp) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+ indexFields, filterFields, filterCmp);
+ }
+
+ //without filter
+ public LSMInvertedIndexDiskComponentBulkLoader(LSMInvertedIndexDiskComponent component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+ null);
+ }
+
+ @Override
+ protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+ return ((LSMInvertedIndexDiskComponent) component).getBloomFilter();
+ }
+
+ @Override
+ protected IIndex getIndex(ILSMDiskComponent component) {
+ return ((LSMInvertedIndexDiskComponent) component).getInvIndex();
+ }
+
+ @Override
+ protected ITreeIndex getTreeIndex(ILSMDiskComponent component) {
+ return ((OnDiskInvertedIndex) ((LSMInvertedIndexDiskComponent) component).getInvIndex()).getBTree();
+ }
+
+ @Override
+ protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) {
+ return ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree();
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 13ff420..3618737 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -71,6 +71,7 @@
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
public class LSMRTree extends AbstractLSMRTree {
@@ -173,49 +174,11 @@
RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor(false);
SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
+
LSMRTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getTarget(),
flushOp.getBTreeTarget(), flushOp.getBloomFilterTarget(), true);
- RTree diskRTree = component.getRTree();
- IIndexBulkLoader rTreeBulkloader;
- ITreeIndexCursor cursor;
- IBinaryComparatorFactory[] linearizerArray = { linearizer };
-
- TreeTupleSorter rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getRTree().getFileId(),
- linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
- flushingComponent.getRTree().getBufferCache(), comparatorFields);
- // BulkLoad the tuples from the in-memory tree into the new disk
- // RTree.
-
- boolean isEmpty = true;
- try {
- while (rtreeScanCursor.hasNext()) {
- isEmpty = false;
- rtreeScanCursor.next();
- rTreeTupleSorter.insertTupleEntry(rtreeScanCursor.getPageId(), rtreeScanCursor.getTupleOffset());
- }
- } finally {
- rtreeScanCursor.close();
- }
- rTreeTupleSorter.sort();
-
- rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false);
- cursor = rTreeTupleSorter;
-
- if (!isEmpty) {
- try {
- while (cursor.hasNext()) {
- cursor.next();
- ITupleReference frameTuple = cursor.getTuple();
- rTreeBulkloader.add(frameTuple);
- }
- } finally {
- cursor.close();
- }
- }
-
- rTreeBulkloader.end();
-
+ //count the number of tuples in the buddy btree
ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
@@ -232,29 +195,55 @@
btreeCountingCursor.close();
}
- int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numBTreeTuples);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+ IIndexBulkLoader componentBulkLoader =
+ createComponentBulkLoader(component, 1.0f, false, numBTreeTuples, false, false);
+ ITreeIndexCursor cursor;
+ IBinaryComparatorFactory[] linearizerArray = { linearizer };
+
+ TreeTupleSorter rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getRTree().getFileId(),
+ linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
+ flushingComponent.getRTree().getBufferCache(), comparatorFields);
+
+ // BulkLoad the tuples from the in-memory tree into the new disk
+ // RTree.
+ boolean isEmpty = true;
+ try {
+ while (rtreeScanCursor.hasNext()) {
+ isEmpty = false;
+ rtreeScanCursor.next();
+ rTreeTupleSorter.insertTupleEntry(rtreeScanCursor.getPageId(), rtreeScanCursor.getTupleOffset());
+ }
+ } finally {
+ rtreeScanCursor.close();
+ }
+ rTreeTupleSorter.sort();
+
+ cursor = rTreeTupleSorter;
+
+ if (!isEmpty) {
+ try {
+ while (cursor.hasNext()) {
+ cursor.next();
+ ITupleReference frameTuple = cursor.getTuple();
+ componentBulkLoader.add(frameTuple);
+ }
+ } finally {
+ cursor.close();
+ }
+ }
+
+ // scan the memory BTree
IIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor(false);
memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
- BTree diskBTree = component.getBTree();
-
- // BulkLoad the tuples from the in-memory tree into the new disk BTree.
- IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false, numBTreeTuples, false);
- IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numBTreeTuples,
- bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
- // scan the memory BTree
try {
while (btreeScanCursor.hasNext()) {
btreeScanCursor.next();
ITupleReference frameTuple = btreeScanCursor.getTuple();
- bTreeBulkloader.add(frameTuple);
- builder.add(frameTuple);
+ ((LSMRTreeDiskComponentBulkLoader) componentBulkLoader).delete(frameTuple);
}
} finally {
btreeScanCursor.close();
- builder.end();
}
if (component.getLSMComponentFilter() != null) {
@@ -266,7 +255,8 @@
}
// Note. If we change the filter to write to metadata object, we don't need the if block above
flushingComponent.getMetadata().copy(component.getMetadata());
- bTreeBulkloader.end();
+
+ componentBulkLoader.end();
return component;
}
@@ -282,40 +272,46 @@
LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(),
mergeOp.getBTreeTarget(), mergeOp.getBloomFilterTarget(), true);
+ IIndexBulkLoader componentBulkLoader;
+
// In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that
// lsmHarness.endSearch() is called once when the r-trees have been merged.
- BTree btree = mergedComponent.getBTree();
- IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
.get(diskComponents.size() - 1)) {
// Keep the deleted tuples since the oldest disk component is not included in the merge operation
- LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
- search(opCtx, btreeCursor, rtreeSearchPred);
-
long numElements = 0L;
for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
numElements += ((LSMRTreeDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter()
.getNumElements();
}
+ componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, numElements, false, false);
- int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
- IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
- bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
-
+ LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
+ search(opCtx, btreeCursor, rtreeSearchPred);
try {
while (btreeCursor.hasNext()) {
btreeCursor.next();
ITupleReference tuple = btreeCursor.getTuple();
- btreeBulkLoader.add(tuple);
- builder.add(tuple);
+ ((LSMRTreeDiskComponentBulkLoader) componentBulkLoader).delete(tuple);
}
} finally {
btreeCursor.close();
- builder.end();
}
+ } else {
+ //no buddy-btree needed
+ componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, 0L, false, false);
+ }
+
+ //search old rtree components
+ try {
+ while (cursor.hasNext()) {
+ cursor.next();
+ ITupleReference frameTuple = cursor.getTuple();
+ componentBulkLoader.add(frameTuple);
+ }
+ } finally {
+ cursor.close();
}
if (mergedComponent.getLSMComponentFilter() != null) {
@@ -327,19 +323,8 @@
getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getRTree());
}
- btreeBulkLoader.end();
- IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
- try {
- while (cursor.hasNext()) {
- cursor.next();
- ITupleReference frameTuple = cursor.getTuple();
- bulkLoader.add(frameTuple);
- }
- } finally {
- cursor.close();
- }
- bulkLoader.end();
+ componentBulkLoader.end();
return mergedComponent;
}
@@ -358,6 +343,25 @@
}
@Override
+ public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor,
+ boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+ throws HyracksDataException {
+ BloomFilterSpecification bloomFilterSpec = null;
+ if (numElementsHint > 0) {
+ int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
+ bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
+ }
+ if (withFilter && filterFields != null) {
+ return new LSMRTreeDiskComponentBulkLoader((LSMRTreeDiskComponent) component, bloomFilterSpec, fillFactor,
+ verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields, filterFields,
+ MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+ } else {
+ return new LSMRTreeDiskComponentBulkLoader((LSMRTreeDiskComponent) component, bloomFilterSpec, fillFactor,
+ verifyInput, numElementsHint, checkIfEmptyIndex);
+ }
+ }
+
+ @Override
public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
throws HyracksDataException {
return new LSMRTreeBulkLoader(this, fillLevel, verifyInput, numElementsHint);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java
index edc3e7d..fbdd37a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java
@@ -20,110 +20,41 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.MultiComparator;
public class LSMRTreeBulkLoader implements IIndexBulkLoader {
private final ILSMDiskComponent component;
- private final IIndexBulkLoader bulkLoader;
- private final IIndexBulkLoader buddyBTreeBulkloader;
- private boolean cleanedUpArtifacts = false;
- private boolean isEmptyComponent = true;
- public final PermutingTupleReference indexTuple;
- public final PermutingTupleReference filterTuple;
- public final MultiComparator filterCmp;
private final LSMRTree lsmIndex;
+ private final IIndexBulkLoader componentBulkLoader;
public LSMRTreeBulkLoader(LSMRTree lsmIndex, float fillFactor, boolean verifyInput, long numElementsHint)
throws HyracksDataException {
this.lsmIndex = lsmIndex;
// Note that by using a flush target file name, we state that the
// new bulk loaded tree is "newer" than any other merged tree.
- component = lsmIndex.createBulkLoadTarget();
- bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
- numElementsHint, false);
- buddyBTreeBulkloader = ((LSMRTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor, verifyInput,
- numElementsHint, false);
- if (lsmIndex.getFilterFields() != null) {
- indexTuple = new PermutingTupleReference(lsmIndex.getTreeFields());
- filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
- filterTuple = new PermutingTupleReference(lsmIndex.getFilterFields());
- } else {
- indexTuple = null;
- filterCmp = null;
- filterTuple = null;
- }
+ this.component = lsmIndex.createBulkLoadTarget();
+ this.componentBulkLoader =
+ lsmIndex.createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
}
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- ITupleReference t;
- if (indexTuple != null) {
- indexTuple.reset(tuple);
- t = indexTuple;
- } else {
- t = tuple;
- }
-
- bulkLoader.add(t);
-
- if (filterTuple != null) {
- filterTuple.reset(tuple);
- component.getLSMComponentFilter().update(filterTuple, filterCmp);
- }
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
+ componentBulkLoader.add(tuple);
}
@Override
public void end() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
-
- if (component.getLSMComponentFilter() != null) {
- lsmIndex.getFilterManager().writeFilter(component.getLSMComponentFilter(),
- ((LSMRTreeDiskComponent) component).getRTree());
- }
-
- bulkLoader.end();
- buddyBTreeBulkloader.end();
-
- if (isEmptyComponent) {
- cleanupArtifacts();
- } else {
- lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component);
- lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
- }
+ componentBulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component);
+ lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
}
}
@Override
public void abort() throws HyracksDataException {
- if (bulkLoader != null) {
- bulkLoader.abort();
- }
- if (buddyBTreeBulkloader != null) {
- buddyBTreeBulkloader.abort();
- }
- }
-
- protected void cleanupArtifacts() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- cleanedUpArtifacts = true;
- ((LSMRTreeDiskComponent) component).getRTree().deactivate();
- ((LSMRTreeDiskComponent) component).getRTree().destroy();
- ((LSMRTreeDiskComponent) component).getBTree().deactivate();
- ((LSMRTreeDiskComponent) component).getBTree().destroy();
- ((LSMRTreeDiskComponent) component).getBloomFilter().deactivate();
- ((LSMRTreeDiskComponent) component).getBloomFilter().destroy();
- }
+ componentBulkLoader.abort();
}
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
new file mode 100644
index 0000000..ff0a299
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMRTreeDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBulkLoader {
+
+ //with filter
+ public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+ float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+ ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp)
+ throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+ indexFields, filterFields, filterCmp);
+ }
+
+ //without filter
+ public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec,
+ float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex)
+ throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+ null);
+ }
+
+ @Override
+ protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+ return ((LSMRTreeDiskComponent) component).getBloomFilter();
+ }
+
+ @Override
+ protected IIndex getIndex(ILSMDiskComponent component) {
+ return ((LSMRTreeDiskComponent) component).getRTree();
+ }
+
+ @Override
+ protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) {
+ return ((LSMRTreeDiskComponent) component).getBTree();
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index a20e6f2..24c46d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -35,7 +35,6 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
@@ -136,14 +135,7 @@
SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
LSMRTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getTarget(), null, null, true);
- RTree diskRTree = component.getRTree();
-
- // scan the memory BTree
- ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
- .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- BTreeRangeSearchCursor btreeScanCursor = (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false);
- RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
- memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
+ IIndexBulkLoader componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, 0L, false, false);
// Since the LSM-RTree is used as a secondary assumption, the
// primary key will be the last comparator in the BTree comparators
@@ -151,12 +143,6 @@
linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
flushingComponent.getRTree().getBufferCache(), comparatorFields);
- TreeTupleSorter bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBTree().getFileId(),
- linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(),
- flushingComponent.getBTree().getBufferCache(), comparatorFields);
- // BulkLoad the tuples from the in-memory tree into the new disk
- // RTree.
-
boolean isEmpty = true;
try {
while (rtreeScanCursor.hasNext()) {
@@ -171,6 +157,16 @@
rTreeTupleSorter.sort();
}
+ // scan the memory BTree
+ ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
+ .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ BTreeRangeSearchCursor btreeScanCursor = (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false);
+ RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
+ memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
+ TreeTupleSorter bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBTree().getFileId(),
+ linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(),
+ flushingComponent.getBTree().getBufferCache(), comparatorFields);
+
isEmpty = true;
try {
while (btreeScanCursor.hasNext()) {
@@ -185,7 +181,6 @@
bTreeTupleSorter.sort();
}
- IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false);
LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new LSMRTreeWithAntiMatterTuplesFlushCursor(rTreeTupleSorter,
bTreeTupleSorter, comparatorFields, linearizerArray);
cursor.open(null, null);
@@ -195,7 +190,7 @@
cursor.next();
ITupleReference frameTuple = cursor.getTuple();
- rTreeBulkloader.add(frameTuple);
+ componentBulkLoader.add(frameTuple);
}
} finally {
cursor.close();
@@ -209,8 +204,8 @@
getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getRTree());
}
flushingComponent.getMetadata().copy(component.getMetadata());
- rTreeBulkloader.end();
+ componentBulkLoader.end();
return component;
}
@@ -225,13 +220,13 @@
// Bulk load the tuples from all on-disk RTrees into the new RTree.
LSMRTreeDiskComponent component = createDiskComponent(componentFactory, mergeOp.getTarget(), null, null, true);
- RTree mergedRTree = component.getRTree();
- IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L, false);
+
+ IIndexBulkLoader componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, 0L, false, false);
try {
while (cursor.hasNext()) {
cursor.next();
ITupleReference frameTuple = cursor.getTuple();
- bulkloader.add(frameTuple);
+ componentBulkLoader.add(frameTuple);
}
} finally {
cursor.close();
@@ -245,7 +240,8 @@
getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples);
getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getRTree());
}
- bulkloader.end();
+
+ componentBulkLoader.end();
return component;
}
@@ -258,6 +254,20 @@
}
@Override
+ public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor,
+ boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter)
+ throws HyracksDataException {
+ if (withFilter && filterFields != null) {
+ return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader((LSMRTreeDiskComponent) component, null,
+ fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields,
+ filterFields, MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+ } else {
+ return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader((LSMRTreeDiskComponent) component, null,
+ fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ }
+ }
+
+ @Override
public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
throws HyracksDataException {
return new LSMRTreeWithAntiMatterTuplesBulkLoader(fillLevel, verifyInput, numElementsHint);
@@ -265,91 +275,35 @@
public class LSMRTreeWithAntiMatterTuplesBulkLoader implements IIndexBulkLoader {
private final ILSMDiskComponent component;
- private final IIndexBulkLoader bulkLoader;
- private boolean cleanedUpArtifacts = false;
- private boolean isEmptyComponent = true;
- public final PermutingTupleReference indexTuple;
- public final PermutingTupleReference filterTuple;
- public final MultiComparator filterCmp;
+ private final IIndexBulkLoader componentBulkLoader;
public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
throws HyracksDataException {
- // Note that by using a flush target file name, we state that the
- // new bulk loaded tree is "newer" than any other merged tree.
-
component = createBulkLoadTarget();
- bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput,
- numElementsHint, false);
- if (getFilterFields() != null) {
- indexTuple = new PermutingTupleReference(getTreeFields());
- filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
- filterTuple = new PermutingTupleReference(getFilterFields());
- } else {
- indexTuple = null;
- filterCmp = null;
- filterTuple = null;
- }
+ componentBulkLoader =
+ createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true);
}
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- ITupleReference t;
- if (indexTuple != null) {
- indexTuple.reset(tuple);
- t = indexTuple;
- } else {
- t = tuple;
- }
-
- bulkLoader.add(t);
-
- if (filterTuple != null) {
- filterTuple.reset(tuple);
- component.getLSMComponentFilter().update(filterTuple, filterCmp);
- }
-
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
+ componentBulkLoader.add(tuple);
}
@Override
public void end() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- if (component.getLSMComponentFilter() != null) {
- getFilterManager().writeFilter(component.getLSMComponentFilter(),
- ((LSMRTreeDiskComponent) component).getRTree());
- }
- bulkLoader.end();
-
- if (isEmptyComponent) {
- cleanupArtifacts();
- } else {
- ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
- getLsmHarness().addBulkLoadedComponent(component);
- }
+ componentBulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component);
+ lsmHarness.addBulkLoadedComponent(component);
}
}
@Override
public void abort() throws HyracksDataException {
- if (bulkLoader != null) {
- bulkLoader.abort();
- }
- }
-
- protected void cleanupArtifacts() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- cleanedUpArtifacts = true;
- ((LSMRTreeDiskComponent) component).getRTree().deactivate();
- ((LSMRTreeDiskComponent) component).getRTree().destroy();
+ if (componentBulkLoader != null) {
+ componentBulkLoader.abort();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
new file mode 100644
index 0000000..88a2e1e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader extends AbstractLSMDiskComponentBulkLoader {
+
+ //with filter
+ public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields,
+ MultiComparator filterCmp) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager,
+ indexFields, filterFields, filterCmp);
+ }
+
+ //without filter
+ public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null,
+ null);
+ }
+
+ @Override
+ protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+ return ((LSMRTreeDiskComponent) component).getBloomFilter();
+ }
+
+ @Override
+ protected ITreeIndex getIndex(ILSMDiskComponent component) {
+ return ((LSMRTreeDiskComponent) component).getRTree();
+ }
+
+}