Allowed merge subset of the disk components correctly. Added new merge policy.
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index c8ed0d0..9f4bd0d 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -269,7 +269,7 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- operationalComponents.addAll(immutableComponents);
+ // The merger is responsible of choosing and adding the components that are targeted for the merge operation.
break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
@@ -423,8 +423,12 @@
LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
opCtx.setOperation(IndexOperation.MERGE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
- ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx);
-
+ boolean returnDeletedTuples = false;
+ if (ctx.getComponentHolder().get(ctx.getComponentHolder().size() - 1) != diskComponents.get(diskComponents
+ .size() - 1)) {
+ returnDeletedTuples = true;
+ }
+ ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples);
BTree firstBTree = (BTree) ((LSMBTreeDiskComponent) mergingComponents.get(0)).getBTree();
BTree lastBTree = (BTree) ((LSMBTreeDiskComponent) mergingComponents.get(mergingComponents.size() - 1))
.getBTree();
@@ -455,8 +459,8 @@
int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
- LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory,
- mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
+ LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
+ mergeOp.getBloomFilterMergeTarget(), true);
IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false);
IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
@@ -476,9 +480,8 @@
return mergedComponent;
}
- private LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory,
- FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean createComponent)
- throws HyracksDataException, IndexException {
+ private LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory, FileReference btreeFileRef,
+ FileReference bloomFilterFileRef, boolean createComponent) throws HyracksDataException, IndexException {
// Create new BTree instance.
LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) factory
.createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
@@ -541,8 +544,8 @@
} catch (HyracksDataException | IndexException e) {
throw new TreeIndexException(e);
}
- bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(
- fillFactor, verifyInput, numElementsHint, false);
+ bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
+ verifyInput, numElementsHint, false);
int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index 0b2d7cf..381b012 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -43,4 +43,9 @@
public BloomFilter getBloomFilter() {
return bloomFilter;
}
+
+ @Override
+ public long getComponentSize() {
+ return btree.getFileReference().getFile().length() + bloomFilter.getFileReference().getFile().length();
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 6eada4b..668a12c 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -49,7 +49,11 @@
private boolean proceed = true;
public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx);
+ this(opCtx, false);
+ }
+
+ public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+ super(opCtx, returnDeletedTuples);
this.copyTuple = new ArrayTupleReference();
this.reusablePred = new RangePredicate(null, null, true, true, null, null);
}
@@ -126,7 +130,7 @@
}
// If there is no previous tuple or the previous tuple can be ignored
if (outputElement == null) {
- if (isDeleted(checkElement)) {
+ if (isDeleted(checkElement) && !returnDeletedTuples) {
// If the key has been deleted then pop it and set needPush to true.
// We cannot push immediately because the tuple may be
// modified if hasNext() is called
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 3405b60..fd7b82f 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
@@ -30,7 +32,8 @@
public interface ILSMIndexAccessor extends IIndexAccessor {
public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException;
- public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
+ public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+ throws HyracksDataException, IndexException;
/**
* Deletes the tuple from the memory component only.
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
index bc6baeb..ec12c23 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
@@ -85,4 +85,6 @@
protected abstract void destroy() throws HyracksDataException;
+ public abstract long getComponentSize();
+
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index b6f5657..a4af2b7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -15,9 +15,13 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.ArrayList;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -32,10 +36,11 @@
@Override
public void diskComponentAdded(final ILSMIndex index) throws HyracksDataException, IndexException {
- if (index.getImmutableComponents().size() >= threshold) {
+ List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents());
+ if (immutableComponents.size() >= threshold) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, immutableComponents);
}
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 145bfe9..cd66937 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -184,6 +184,7 @@
// newComponent is null if the merge op. was not performed.
if (newComponent != null) {
lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
+ mergePolicy.diskComponentAdded(lsmIndex);
}
break;
default:
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 45cc69b..2bc45a9 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -42,11 +42,13 @@
protected boolean includeMutableComponent;
protected ILSMHarness lsmHarness;
protected final ILSMIndexOperationContext opCtx;
+ protected final boolean returnDeletedTuples;
protected List<ILSMComponent> operationalComponents;
- public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx) {
+ public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
this.opCtx = opCtx;
+ this.returnDeletedTuples = returnDeletedTuples;
outputElement = null;
needPush = false;
}
@@ -110,14 +112,14 @@
@Override
public void close() throws HyracksDataException {
- if (lsmHarness != null) {
- try {
- outputPriorityQueue.clear();
- for (int i = 0; i < rangeCursors.length; i++) {
- rangeCursors[i].close();
- }
- rangeCursors = null;
- } finally {
+ try {
+ outputPriorityQueue.clear();
+ for (int i = 0; i < rangeCursors.length; i++) {
+ rangeCursors[i].close();
+ }
+ rangeCursors = null;
+ } finally {
+ if (lsmHarness != null) {
lsmHarness.endSearch(opCtx);
}
}
@@ -154,7 +156,50 @@
return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter();
}
- abstract protected void checkPriorityQueue() throws HyracksDataException, IndexException;
+ protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+ while (!outputPriorityQueue.isEmpty() || needPush == true) {
+ if (!outputPriorityQueue.isEmpty()) {
+ PriorityQueueElement checkElement = outputPriorityQueue.peek();
+ // If there is no previous tuple or the previous tuple can be ignored
+ if (outputElement == null) {
+ if (isDeleted(checkElement) && !returnDeletedTuples) {
+ // If the key has been deleted then pop it and set needPush to true.
+ // We cannot push immediately because the tuple may be
+ // modified if hasNext() is called
+ outputElement = outputPriorityQueue.poll();
+ needPush = true;
+ } else {
+ break;
+ }
+ } else {
+ // Compare the previous tuple and the head tuple in the PQ
+ if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+ // If the previous tuple and the head tuple are
+ // identical
+ // then pop the head tuple and push the next tuple from
+ // the tree of head tuple
+
+ // the head element of PQ is useless now
+ PriorityQueueElement e = outputPriorityQueue.poll();
+ pushIntoPriorityQueue(e);
+ } else {
+ // If the previous tuple and the head tuple are different
+ // the info of previous tuple is useless
+ if (needPush == true) {
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ }
+ outputElement = null;
+ }
+ }
+ } else {
+ // the priority queue is empty and needPush
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ outputElement = null;
+ }
+ }
+ }
@Override
public boolean exclusiveLatchNodes() {
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index f11a061..8c87afa 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -116,8 +119,10 @@
}
@Override
- public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+ throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.MERGE);
+ ctx.getComponentHolder().addAll(components);
lsmHarness.scheduleMerge(ctx, callback);
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index f2f058a..160f03c 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -292,7 +292,7 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- operationalComponents.addAll(immutableComponents);
+ // The merger is responsible of choosing and adding the components that are targeted for the merge operation.
break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
@@ -562,6 +562,31 @@
mergeOp.getBloomFilterMergeTarget(), true);
IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
+
+ // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the inverted indexes so that
+ // lsmHarness.endSearch() is called once when the inverted indexes have been merged.
+ if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
+ .get(diskComponents.size() - 1)) {
+ // Keep the deleted tuples since the oldest disk component is not included in the merge operation
+
+ LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor = new LSMInvertedIndexDeletedKeysBTreeMergeCursor(
+ opCtx);
+ search(opCtx, btreeCursor, mergePred);
+
+ BTree btree = component.getDeletedKeysBTree();
+ IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
+ try {
+ while (btreeCursor.hasNext()) {
+ btreeCursor.next();
+ ITupleReference tuple = btreeCursor.getTuple();
+ btreeBulkLoader.add(tuple);
+ }
+ } finally {
+ btreeCursor.close();
+ }
+ btreeBulkLoader.end();
+ }
+
IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L, false);
try {
while (cursor.hasNext()) {
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index e31af9a..7f7d5bc 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -84,8 +87,10 @@
}
@Override
- public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+ throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.MERGE);
+ ctx.getComponentHolder().addAll(components);
lsmHarness.scheduleMerge(ctx, callback);
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
new file mode 100644
index 0000000..3efaaba
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+
+public class LSMInvertedIndexDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
+
+ public LSMInvertedIndexDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
+ super(opCtx, true);
+ }
+
+ @Override
+ protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+ return false;
+ }
+
+ @Override
+ public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException,
+ IndexException {
+ LSMInvertedIndexRangeSearchCursorInitialState lsmInitialState = (LSMInvertedIndexRangeSearchCursorInitialState) initialState;
+ cmp = lsmInitialState.getOriginalKeyComparator();
+ operationalComponents = lsmInitialState.getOperationalComponents();
+ // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge the inverted indexes.
+ lsmHarness = null;
+ int numBTrees = operationalComponents.size();
+ rangeCursors = new IIndexCursor[numBTrees];
+
+ MultiComparator keyCmp = lsmInitialState.getKeyComparator();
+ RangePredicate btreePredicate = new RangePredicate(null, null, true, true, keyCmp, keyCmp);
+ ArrayList<IIndexAccessor> btreeAccessors = lsmInitialState.getDeletedKeysBTreeAccessors();
+ for (int i = 0; i < numBTrees; i++) {
+ rangeCursors[i] = btreeAccessors.get(i).createSearchCursor();
+ btreeAccessors.get(i).search(rangeCursors[i], btreePredicate);
+ }
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index 323edd1..446e807 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractDiskLSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
public class LSMInvertedIndexDiskComponent extends AbstractDiskLSMComponent {
@@ -53,4 +54,12 @@
public BloomFilter getBloomFilter() {
return bloomFilter;
}
+
+ @Override
+ public long getComponentSize() {
+ return ((OnDiskInvertedIndex) invIndex).getInvListsFile().getFile().length()
+ + ((OnDiskInvertedIndex) invIndex).getBTree().getFileReference().getFile().length()
+ + deletedKeysBTree.getFileReference().getFile().length()
+ + bloomFilter.getFileReference().getFile().length();
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index cd0dde3..a6ff07e 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -28,8 +28,8 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingTupleReference;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BloomFilterAwareBTreePointSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
@@ -43,7 +43,7 @@
protected RangePredicate keySearchPred;
public LSMInvertedIndexRangeSearchCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx);
+ super(opCtx, false);
}
@Override
@@ -76,12 +76,12 @@
for (int i = 0; i < operationalComponents.size(); i++) {
ILSMComponent component = operationalComponents.get(i);
if (component.getType() == LSMComponentType.MEMORY) {
- // No need for a bloom filter for the in-memory BTree.
+ // No need for a bloom filter for the in-memory BTree.
deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor();
} else {
- deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor((IBTreeLeafFrame) lsmInitState
- .getgetDeletedKeysBTreeLeafFrameFactory().createFrame(), false,
- ((LSMInvertedIndexDiskComponent) operationalComponents.get(i)).getBloomFilter());
+ deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor(
+ (IBTreeLeafFrame) lsmInitState.getgetDeletedKeysBTreeLeafFrameFactory().createFrame(),
+ false, ((LSMInvertedIndexDiskComponent) operationalComponents.get(i)).getBloomFilter());
}
}
}
@@ -114,50 +114,4 @@
}
return false;
}
-
- @Override
- protected void checkPriorityQueue() throws HyracksDataException, IndexException {
- while (!outputPriorityQueue.isEmpty() || needPush == true) {
- if (!outputPriorityQueue.isEmpty()) {
- PriorityQueueElement checkElement = outputPriorityQueue.peek();
- // If there is no previous tuple or the previous tuple can be ignored
- if (outputElement == null) {
- if (isDeleted(checkElement)) {
- // If the key has been deleted then pop it and set needPush to true.
- // We cannot push immediately because the tuple may be
- // modified if hasNext() is called
- outputElement = outputPriorityQueue.poll();
- needPush = true;
- } else {
- break;
- }
- } else {
- // Compare the previous tuple and the head tuple in the PQ
- if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
- // If the previous tuple and the head tuple are
- // identical
- // then pop the head tuple and push the next tuple from
- // the tree of head tuple
-
- // the head element of PQ is useless now
- PriorityQueueElement e = outputPriorityQueue.poll();
- pushIntoPriorityQueue(e);
- } else {
- // If the previous tuple and the head tuple are different
- // the info of previous tuple is useless
- if (needPush == true) {
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- }
- outputElement = null;
- }
- }
- } else {
- // the priority queue is empty and needPush
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- outputElement = null;
- }
- }
- }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 0ab2f4d..67d1796 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -462,6 +462,10 @@
public BTree getBTree() {
return btree;
}
+
+ public FileReference getInvListsFile() {
+ return invListsFile;
+ }
public class OnDiskInvertedIndexAccessor implements IInvertedIndexAccessor {
private final OnDiskInvertedIndex index;
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 04118c8..d973148 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -24,18 +24,15 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -225,7 +222,7 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- operationalComponents.addAll(immutableComponents);
+ // The merger is responsible of choosing and adding the components that are targeted for the merge operation.
break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
@@ -332,38 +329,11 @@
ctx.modificationCallback.before(tuple);
ctx.modificationCallback.found(null, tuple);
if (ctx.getOperation() == IndexOperation.INSERT) {
- // Before each insert, we must check whether there exist a killer
- // tuple in the memBTree. If we find a killer tuple, we must truly
- // delete the existing tuple from the BTree, and then insert it to
- // memRTree. Otherwise, the old killer tuple will kill the newly
- // added RTree tuple.
- RangePredicate btreeRangePredicate = new RangePredicate(tuple, tuple, true, true,
- ctx.getBTreeMultiComparator(), ctx.getBTreeMultiComparator());
- ITreeIndexCursor cursor = ctx.currentMutableBTreeAccessor.createSearchCursor();
- ctx.currentMutableBTreeAccessor.search(cursor, btreeRangePredicate);
- boolean foundTupleInMemoryBTree = false;
- try {
- if (cursor.hasNext()) {
- foundTupleInMemoryBTree = true;
- }
- } finally {
- cursor.close();
- }
- if (foundTupleInMemoryBTree) {
- try {
- ctx.currentMutableBTreeAccessor.delete(tuple);
- } catch (TreeIndexNonExistentKeyException e) {
- // Tuple has been deleted in the meantime. Do nothing.
- // This normally shouldn't happen if we are dealing with
- // good citizens since LSMRTree is used as a secondary
- // index and a tuple shouldn't be deleted twice without
- // insert between them.
- }
- } else {
- ctx.currentMutableRTreeAccessor.insert(tuple);
- }
-
+ ctx.currentMutableRTreeAccessor.insert(tuple);
} else {
+ // First remove all entries in the in-memory rtree (if any).
+ ctx.currentMutableRTreeAccessor.delete(tuple);
+ // Insert key into the deleted-keys BTree.
try {
ctx.currentMutableBTreeAccessor.insert(tuple);
} catch (TreeIndexDuplicateKeyException e) {
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 410fde8..68b7fd8 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -309,8 +309,31 @@
LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
- IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
+ // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that
+ // lsmHarness.endSearch() is called once when the r-trees have been merged.
+ if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
+ .get(diskComponents.size() - 1)) {
+ // Keep the deleted tuples since the oldest disk component is not included in the merge operation
+
+ LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
+ search(opCtx, btreeCursor, rtreeSearchPred);
+
+ BTree btree = mergedComponent.getBTree();
+ IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
+ try {
+ while (btreeCursor.hasNext()) {
+ btreeCursor.next();
+ ITupleReference tuple = btreeCursor.getTuple();
+ btreeBulkLoader.add(tuple);
+ }
+ } finally {
+ btreeCursor.close();
+ }
+ btreeBulkLoader.end();
+ }
+
+ IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
try {
while (cursor.hasNext()) {
cursor.next();
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
new file mode 100644
index 0000000..fe2ed96
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+
+public class LSMRTreeDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
+
+ public LSMRTreeDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
+ super(opCtx, true);
+ }
+
+ @Override
+ protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+ return false;
+ }
+
+ @Override
+ public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException,
+ IndexException {
+ LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
+ cmp = lsmInitialState.getBTreeCmp();
+ operationalComponents = lsmInitialState.getOperationalComponents();
+ // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge r-trees.
+ lsmHarness = null;
+ int numBTrees = operationalComponents.size();
+ rangeCursors = new IIndexCursor[numBTrees];
+
+ RangePredicate btreePredicate = new RangePredicate(null, null, true, true, cmp, cmp);
+ IIndexAccessor[] btreeAccessors = new ITreeIndexAccessor[numBTrees];
+ for (int i = 0; i < numBTrees; i++) {
+ ILSMComponent component = operationalComponents.get(i);
+ IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory().createFrame();
+ rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
+ BTree btree = (BTree) ((LSMRTreeDiskComponent) component).getBTree();
+ btreeAccessors[i] = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ btreeAccessors[i].search(rangeCursors[i], btreePredicate);
+ }
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
index 7bc3f79..c3c0a55 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
@@ -54,4 +54,14 @@
public BloomFilter getBloomFilter() {
return bloomFilter;
}
+
+ @Override
+ public long getComponentSize() {
+ long size = rtree.getFileReference().getFile().length();
+ if (btree != null) {
+ size += btree.getFileReference().getFile().length();
+ size += bloomFilter.getFileReference().getFile().length();
+ }
+ return size;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index 30dd467..f669585 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -79,7 +79,7 @@
ITupleReference currentTuple = rtreeCursors[currentCursor].getTuple();
boolean killerTupleFound = false;
- for (int i = 0; i <= currentCursor; i++) {
+ for (int i = 0; i < currentCursor; i++) {
btreeCursors[i].reset();
btreeRangePredicate.setHighKey(currentTuple, true);
btreeRangePredicate.setLowKey(currentTuple, true);
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index dd31165..2e6fc78 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -25,6 +25,8 @@
public class LSMRTreeSortedCursor extends LSMRTreeAbstractCursor {
+ // TODO: This class can be removed and instead use a search cursor that uses a logic similar
+ // to the one in LSMRTreeWithAntiMatterTuplesSearchCursor
private ILinearizeComparator linearizeCmp;
private boolean[] depletedRtreeCursors;
private int foundIn = -1;
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 0996257..f282f78 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -76,9 +76,10 @@
ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
super(virtualBufferCaches, rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory,
- btreeLeafFrameFactory, fileManager, new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(diskRTreeFactory),
- diskFileMapProvider, fieldCount, rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields,
- linearizerArray, 0, mergePolicy, opTracker, ioScheduler, ioOpCallbackProvider);
+ btreeLeafFrameFactory, fileManager, new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(
+ diskRTreeFactory), diskFileMapProvider, fieldCount, rtreeCmpFactories, btreeCmpFactories,
+ linearizer, comparatorFields, linearizerArray, 0, mergePolicy, opTracker, ioScheduler,
+ ioOpCallbackProvider);
bulkLoaComponentFactory = new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(bulkLoadRTreeFactory);
this.bTreeTupleSorter = null;
}
@@ -248,7 +249,12 @@
LSMRTreeOpContext rctx = createOpContext(NoOpOperationCallback.INSTANCE);
rctx.setOperation(IndexOperation.MERGE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
- ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(rctx);
+ boolean returnDeletedTuples = false;
+ if (ctx.getComponentHolder().get(ctx.getComponentHolder().size() - 1) != diskComponents.get(diskComponents
+ .size() - 1)) {
+ returnDeletedTuples = true;
+ }
+ ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(rctx, returnDeletedTuples);
LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, rctx);
ioScheduler.scheduleOperation(new LSMRTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index cbaf3b3..7099d7d 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -54,7 +54,11 @@
private int numMutableComponents;
public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx);
+ this(opCtx, false);
+ }
+
+ public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+ super(opCtx, returnDeletedTuples);
currentCursor = 0;
}
@@ -152,7 +156,7 @@
while (super.hasNext()) {
super.next();
ITupleReference diskRTreeTuple = super.getTuple();
- if (searchMemBTrees(diskRTreeTuple, numMutableComponents - 1)) {
+ if (searchMemBTrees(diskRTreeTuple, numMutableComponents)) {
foundNext = true;
frameTuple = diskRTreeTuple;
return true;
@@ -216,7 +220,7 @@
private boolean searchMemBTrees(ITupleReference tuple, int lastBTreeToSearch) throws HyracksDataException,
IndexException {
- for (int i = 0; i <= lastBTreeToSearch; i++) {
+ for (int i = 0; i < lastBTreeToSearch; i++) {
btreeCursors[i].reset();
btreeRangePredicate.setHighKey(tuple, true);
btreeRangePredicate.setLowKey(tuple, true);
@@ -261,50 +265,4 @@
}
}
}
-
- @Override
- protected void checkPriorityQueue() throws HyracksDataException, IndexException {
- while (!outputPriorityQueue.isEmpty() || needPush == true) {
- if (!outputPriorityQueue.isEmpty()) {
- PriorityQueueElement checkElement = outputPriorityQueue.peek();
- // If there is no previous tuple or the previous tuple can be ignored
- if (outputElement == null) {
- if (isDeleted(checkElement)) {
- // If the key has been deleted then pop it and set needPush to true.
- // We cannot push immediately because the tuple may be
- // modified if hasNext() is called
- outputElement = outputPriorityQueue.poll();
- needPush = true;
- } else {
- break;
- }
- } else {
- // Compare the previous tuple and the head tuple in the PQ
- if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
- // If the previous tuple and the head tuple are
- // identical
- // then pop the head tuple and push the next tuple from
- // the tree of head tuple
-
- // the head element of PQ is useless now
- PriorityQueueElement e = outputPriorityQueue.poll();
- pushIntoPriorityQueue(e);
- } else {
- // If the previous tuple and the head tuple are different
- // the info of previous tuple is useless
- if (needPush == true) {
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- }
- outputElement = null;
- }
- }
- } else {
- // the priority queue is empty and needPush
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- outputElement = null;
- }
- }
- }
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index f7aa7f4..9729d51 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.storage.am.btree.OrderedIndexTestUtils;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
@@ -70,7 +71,8 @@
}
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((LSMBTree) ctx.getIndex()).getImmutableComponents());
orderedIndexTestUtils.checkPointSearches(ctx);
orderedIndexTestUtils.checkScan(ctx);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 4a96131..35ecc20 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -106,7 +106,7 @@
break;
case MERGE:
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, lsmBTree.getImmutableComponents());
break;
default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
index 5115b7e..3713498 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
@@ -54,7 +55,8 @@
invIndex.activate();
}
// Perform merge.
- invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((LSMInvertedIndex) invIndex).getImmutableComponents());
validateAndCheckIndex(testCtx);
runTinySearchWorkload(testCtx, tupleGen);
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
index 523557d..53076fa 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
@@ -54,7 +55,8 @@
invIndex.activate();
}
// Perform merge.
- invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((LSMInvertedIndex) invIndex).getImmutableComponents());
validateAndCheckIndex(testCtx);
runTinySearchWorkload(testCtx, tupleGen);
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
index e1570af..35afd09 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
@@ -109,7 +109,7 @@
}
case MERGE: {
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, invIndex.getImmutableComponents());
break;
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
index 18528c4..155f537 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestDriver;
import edu.uci.ics.hyracks.storage.am.rtree.RTreeTestUtils;
@@ -72,7 +73,8 @@
}
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((LSMRTree) ctx.getIndex()).getImmutableComponents());
rTreeTestUtils.checkScan(ctx);
rTreeTestUtils.checkDiskOrderScan(ctx);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
index b3fddc8..c7a86d9 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
@@ -73,7 +73,7 @@
break;
case MERGE:
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, lsmRTree.getImmutableComponents());
break;
default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
index 12d7742..c436d9f 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuples.LSMRTreeWithAntiMatterTuplesAccessor;
import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
@@ -61,7 +62,7 @@
break;
case MERGE:
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, ((LSMRTree) lsmRTree).getImmutableComponents());
break;
default: