major reworking of all lsm indexes with respect to synchronization and interfacing with the lsmharness
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@2681 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
index db3f615..73a67ea 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
@@ -27,7 +27,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
@@ -51,7 +51,7 @@
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
invertedIndexDataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
- new ConstantMergePolicyProvider(MERGE_THRESHOLD), RefCountingOperationTrackerFactory.INSTANCE,
+ new ConstantMergePolicyProvider(MERGE_THRESHOLD), ThreadCountingOperationTrackerFactory.INSTANCE,
SynchronousSchedulerProvider.INSTANCE);
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
index 4ac6892..7ee654c 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
@@ -25,7 +25,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
@@ -46,7 +46,7 @@
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(new ConstantMergePolicyProvider(
- MERGE_THRESHOLD), RefCountingOperationTrackerFactory.INSTANCE, SynchronousSchedulerProvider.INSTANCE);
+ MERGE_THRESHOLD), ThreadCountingOperationTrackerFactory.INSTANCE, SynchronousSchedulerProvider.INSTANCE);
}
@Override
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index 2be0e83..7002bbf 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -19,7 +19,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.tests.am.common.LSMTreeOperatorTestHelper;
@@ -33,7 +33,7 @@
public IIndexDataflowHelperFactory createDataFlowHelperFactory() {
return new LSMBTreeDataflowHelperFactory(new ConstantMergePolicyProvider(MERGE_THRESHOLD),
- RefCountingOperationTrackerFactory.INSTANCE, SynchronousSchedulerProvider.INSTANCE);
+ ThreadCountingOperationTrackerFactory.INSTANCE, SynchronousSchedulerProvider.INSTANCE);
}
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
index 8c6c99f..f36f153 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
@@ -21,7 +21,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
@@ -39,7 +39,7 @@
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
return new LSMRTreeDataflowHelperFactory(valueProviderFactories, rtreePolicyType, btreeComparatorFactories,
- new ConstantMergePolicyProvider(MERGE_THRESHOLD), RefCountingOperationTrackerFactory.INSTANCE,
+ new ConstantMergePolicyProvider(MERGE_THRESHOLD), ThreadCountingOperationTrackerFactory.INSTANCE,
SynchronousSchedulerProvider.INSTANCE, linearizerCmpFactory);
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
index 46ff507..b888f2a 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
@@ -21,7 +21,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
@@ -40,7 +40,7 @@
IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
return new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories, rtreePolicyType,
btreeComparatorFactories, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
- RefCountingOperationTrackerFactory.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
+ ThreadCountingOperationTrackerFactory.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
linearizerCmpFactory);
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
index 8990f3f..1885348 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -24,5 +24,6 @@
DISKORDERSCAN,
PHYSICALDELETE,
NOOP,
- MERGE
+ MERGE,
+ FLUSH
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 40f4c09..6c6b964 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -16,7 +16,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
import java.io.File;
-import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
@@ -31,6 +30,7 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
@@ -63,7 +63,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentState;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
@@ -73,13 +72,13 @@
public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
// In-memory components.
- private final LSMBTreeComponent mutableComponent;
+ private final LSMBTreeMutableComponent mutableComponent;
// For creating BTree's used in flush and merge.
- private final LSMBTreeComponentFactory componentFactory;
+ private final LSMBTreeImmutableComponentFactory componentFactory;
// For creating BTree's used in bulk load. Different from diskBTreeFactory
// because it should have a different tuple writer in it's leaf frames.
- private final LSMBTreeComponentFactory bulkLoadComponentFactory;
+ private final LSMBTreeImmutableComponentFactory bulkLoadComponentFactory;
// Common for in-memory and on-disk components.
private final ITreeIndexFrameFactory insertLeafFrameFactory;
@@ -95,14 +94,15 @@
ILSMIOOperationScheduler ioScheduler) {
super(memFreePageManager, diskBTreeFactory.getBufferCache(), fileManager, diskFileMapProvider, mergePolicy,
opTrackerFactory, ioScheduler);
- mutableComponent = new LSMBTreeComponent(new BTree(memBufferCache,
+ mutableComponent = new LSMBTreeMutableComponent(new BTree(memBufferCache,
((InMemoryBufferCache) memBufferCache).getFileMapProvider(), memFreePageManager, interiorFrameFactory,
- insertLeafFrameFactory, cmpFactories, fieldCount, new FileReference(new File("membtree"))));
+ insertLeafFrameFactory, cmpFactories, fieldCount, new FileReference(new File("membtree"))),
+ memFreePageManager);
this.insertLeafFrameFactory = insertLeafFrameFactory;
this.deleteLeafFrameFactory = deleteLeafFrameFactory;
this.cmpFactories = cmpFactories;
- componentFactory = new LSMBTreeComponentFactory(diskBTreeFactory);
- bulkLoadComponentFactory = new LSMBTreeComponentFactory(bulkLoadBTreeFactory);
+ componentFactory = new LSMBTreeImmutableComponentFactory(diskBTreeFactory);
+ bulkLoadComponentFactory = new LSMBTreeImmutableComponentFactory(bulkLoadBTreeFactory);
}
@Override
@@ -113,7 +113,7 @@
fileManager.deleteDirs();
fileManager.createDirs();
- immutableComponents.clear();
+ componentsRef.get().clear();
}
@Override
@@ -125,6 +125,7 @@
((InMemoryBufferCache) mutableComponent.getBTree().getBufferCache()).open();
mutableComponent.getBTree().create();
mutableComponent.getBTree().activate();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
immutableComponents.clear();
List<LSMComponentFileReferences> validFileReferences;
try {
@@ -133,7 +134,7 @@
throw new HyracksDataException(e);
}
for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
- LSMBTreeComponent btree;
+ LSMBTreeImmutableComponent btree;
try {
btree = createDiskComponent(componentFactory, lsmComonentFileReference.getInsertIndexFileReference(),
false);
@@ -160,8 +161,9 @@
throw new HyracksDataException(e);
}
+ List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- BTree btree = (BTree) ((LSMBTreeComponent) c).getBTree();
+ BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
btree.deactivate();
}
mutableComponent.getBTree().deactivate();
@@ -176,8 +178,9 @@
throw new HyracksDataException("Failed to destroy the index since it is activated.");
}
+ List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- BTree btree = (BTree) ((LSMBTreeComponent) c).getBTree();
+ BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
btree.destroy();
}
mutableComponent.getBTree().destroy();
@@ -190,9 +193,10 @@
throw new HyracksDataException("Failed to clear the index since it is not activated.");
}
+ List<ILSMComponent> immutableComponents = componentsRef.get();
mutableComponent.getBTree().clear();
for (ILSMComponent c : immutableComponents) {
- BTree btree = (BTree) ((LSMBTreeComponent) c).getBTree();
+ BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
btree.deactivate();
btree.destroy();
}
@@ -200,33 +204,33 @@
}
@Override
- public List<ILSMComponent> getOperationalComponents(IIndexOperationContext ctx) {
- List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
+ public void getOperationalComponents(ILSMIndexOperationContext ctx) {
+ List<ILSMComponent> immutableComponents = componentsRef.get();
+ List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
+ operationalComponents.clear();
switch (ctx.getOperation()) {
+ case UPDATE:
+ case UPSERT:
+ case PHYSICALDELETE:
+ case FLUSH:
+ case DELETE:
+ operationalComponents.add(mutableComponent);
+ break;
case SEARCH:
case INSERT:
- // TODO: We should add the mutable component at some point.
+ operationalComponents.add(mutableComponent);
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- // TODO: determining the participating components in a merge should probably the task of the merge policy.
- if (immutableComponents.size() > 1) {
- for (ILSMComponent c : immutableComponents) {
- if (c.negativeCompareAndSet(LSMComponentState.MERGING, LSMComponentState.MERGING)) {
- operationalComponents.add(c);
- }
- }
- }
+ operationalComponents.addAll(immutableComponents);
break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
- return operationalComponents;
}
@Override
- public void insertUpdateOrDelete(ITupleReference tuple, IIndexOperationContext ictx) throws HyracksDataException,
- IndexException {
+ public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException, IndexException {
LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
switch (ctx.getOperation()) {
case PHYSICALDELETE:
@@ -268,7 +272,7 @@
// TODO: Can we just remove the above code that search the mutable component and do it together with the search call below? i.e. instead of passing false to the lsmHarness.search(), we pass true to include the mutable component?
// the key was not in the inmemory component, so check the disk components
- lsmHarness.search(searchCursor, predicate, ctx, false);
+ search(ctx, searchCursor, predicate);
try {
if (searchCursor.hasNext()) {
throw new BTreeDuplicateKeyException("Failed to insert key since key already exists.");
@@ -282,16 +286,71 @@
}
@Override
+ public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException {
+ LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
+ LSMBTreeRangeSearchCursor lsmTreeCursor = (LSMBTreeRangeSearchCursor) cursor;
+ List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
+ int numBTrees = operationalComponents.size();
+ assert numBTrees > 0;
+
+ boolean includeMutableComponent = operationalComponents.get(0) == mutableComponent;
+ LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees, insertLeafFrameFactory,
+ ctx.cmp, includeMutableComponent, lsmHarness, ctx.memBTreeAccessor, pred, ctx.searchCallback,
+ operationalComponents);
+ lsmTreeCursor.open(initialState, pred);
+
+ int cursorIx;
+ ListIterator<ILSMComponent> diskBTreesIter = operationalComponents.listIterator();
+ if (includeMutableComponent) {
+ // Open cursor of in-memory BTree at index 0.
+ ctx.memBTreeAccessor.search(lsmTreeCursor.getCursor(0), pred);
+ // Skip 0 because it is the in-memory BTree.
+ cursorIx = 1;
+ diskBTreesIter.next();
+ } else {
+ cursorIx = 0;
+ }
+
+ // Open cursors of on-disk BTrees.
+ int numDiskComponents = includeMutableComponent ? numBTrees - 1 : numBTrees;
+ ITreeIndexAccessor[] diskBTreeAccessors = new ITreeIndexAccessor[numDiskComponents];
+ int diskBTreeIx = 0;
+ while (diskBTreesIter.hasNext()) {
+ BTree diskBTree = (BTree) ((LSMBTreeImmutableComponent) diskBTreesIter.next()).getBTree();
+ diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ diskBTreeAccessors[diskBTreeIx].search(lsmTreeCursor.getCursor(cursorIx), pred);
+ cursorIx++;
+ diskBTreeIx++;
+ }
+ lsmTreeCursor.initPriorityQueue();
+ }
+
+ @Override
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
+ LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
+ LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ assert ctx.getComponentHolder().size() == 1;
+ ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
+ opCtx.setOperation(IndexOperation.FLUSH);
+ opCtx.getComponentHolder().add(flushingComponent);
+ ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
+ ioScheduler.scheduleOperation(new LSMFlushOperation(flushAccessor, flushingComponent, componentFileRefs
+ .getInsertIndexFileReference(), callback));
+ }
+
+ @Override
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
LSMFlushOperation flushOp = (LSMFlushOperation) operation;
- // Bulk load a new on-disk BTree from the in-memory BTree.
+ LSMBTreeMutableComponent flushingComponent = (LSMBTreeMutableComponent) flushOp.getFlushingComponent();
+ IIndexAccessor accessor = flushingComponent.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ IIndexCursor scanCursor = accessor.createSearchCursor();
RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
- ITreeIndexAccessor memBTreeAccessor = mutableComponent.getBTree().createAccessor(
- NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor();
- memBTreeAccessor.search(scanCursor, nullPred);
- LSMBTreeComponent component = createDiskComponent(componentFactory, flushOp.getFlushTarget(), true);
- // Bulk load the tuples from the in-memory BTree into the new disk BTree.
+ accessor.search(scanCursor, nullPred);
+ LSMBTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getFlushTarget(), true);
IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false);
try {
while (scanCursor.hasNext()) {
@@ -305,71 +364,26 @@
return component;
}
- @Override
- public void resetMutableComponent() throws HyracksDataException {
- memFreePageManager.reset();
- mutableComponent.getBTree().clear();
- }
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
+ opCtx.getComponentHolder().addAll(mergingComponents);
+ ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx);
+ RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
+ search(opCtx, cursor, rangePred);
- private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
- LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
- return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(), true);
- }
-
- private LSMBTreeComponent createDiskComponent(LSMBTreeComponentFactory factory, FileReference fileRef,
- boolean createComponent) throws HyracksDataException, IndexException {
- // Create new BTree instance.
- LSMBTreeComponent component = (LSMBTreeComponent) factory
- .createLSMComponentInstance(new LSMComponentFileReferences(fileRef, null));
- if (createComponent) {
- component.getBTree().create();
- }
- // BTree will be closed during cleanup of merge().
- component.getBTree().activate();
- return component;
- }
-
- @Override
- public void search(IIndexCursor cursor, List<ILSMComponent> immutableComponents, ISearchPredicate pred,
- IIndexOperationContext ictx, boolean includeMutableComponent) throws HyracksDataException, IndexException {
- LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
- LSMBTreeRangeSearchCursor lsmTreeCursor = (LSMBTreeRangeSearchCursor) cursor;
- int numDiskComponents = immutableComponents.size();
- int numBTrees = (includeMutableComponent) ? numDiskComponents + 1 : numDiskComponents;
-
- List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
- if (includeMutableComponent) {
- operationalComponents.add(getMutableComponent());
- }
- operationalComponents.addAll(immutableComponents);
- LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees, insertLeafFrameFactory,
- ctx.cmp, includeMutableComponent, lsmHarness, ctx.memBTreeAccessor, pred, ctx.searchCallback,
- operationalComponents);
- lsmTreeCursor.open(initialState, pred);
-
- int cursorIx;
- if (includeMutableComponent) {
- // Open cursor of in-memory BTree at index 0.
- ctx.memBTreeAccessor.search(lsmTreeCursor.getCursor(0), pred);
- // Skip 0 because it is the in-memory BTree.
- cursorIx = 1;
- } else {
- cursorIx = 0;
- }
-
- // Open cursors of on-disk BTrees.
- ITreeIndexAccessor[] diskBTreeAccessors = new ITreeIndexAccessor[numDiskComponents];
- int diskBTreeIx = 0;
- ListIterator<ILSMComponent> diskBTreesIter = immutableComponents.listIterator();
- while (diskBTreesIter.hasNext()) {
- BTree diskBTree = (BTree) ((LSMBTreeComponent) diskBTreesIter.next()).getBTree();
- diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- diskBTreeAccessors[diskBTreeIx].search(lsmTreeCursor.getCursor(cursorIx), pred);
- cursorIx++;
- diskBTreeIx++;
- }
- lsmTreeCursor.initPriorityQueue();
+ opCtx.setOperation(IndexOperation.MERGE);
+ BTree firstBTree = (BTree) ((LSMBTreeImmutableComponent) mergingComponents.get(0)).getBTree();
+ BTree lastBTree = (BTree) ((LSMBTreeImmutableComponent) mergingComponents.get(mergingComponents.size() - 1))
+ .getBTree();
+ FileReference firstFile = diskFileMapProvider.lookupFileName(firstBTree.getFileId());
+ FileReference lastFile = diskFileMapProvider.lookupFileName(lastBTree.getFileId());
+ LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFile.getFile()
+ .getName(), lastFile.getFile().getName());
+ ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
+ ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
+ .getInsertIndexFileReference(), callback));
}
@Override
@@ -377,17 +391,8 @@
throws HyracksDataException, IndexException {
LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
ITreeIndexCursor cursor = mergeOp.getCursor();
-
mergedComponents.addAll(mergeOp.getMergingComponents());
-
- // Nothing to merge.
- if (mergedComponents.size() <= 1) {
- cursor.close();
- return null;
- }
-
- // Bulk load the tuples from all on-disk BTrees into the new BTree.
- LSMBTreeComponent mergedBTree = createDiskComponent(componentFactory, mergeOp.getMergeTarget(), true);
+ LSMBTreeImmutableComponent mergedBTree = createDiskComponent(componentFactory, mergeOp.getMergeTarget(), true);
IIndexBulkLoader bulkLoader = mergedBTree.getBTree().createBulkLoader(1.0f, false);
try {
while (cursor.hasNext()) {
@@ -402,11 +407,36 @@
return mergedBTree;
}
+ private LSMBTreeImmutableComponent createDiskComponent(LSMBTreeImmutableComponentFactory factory,
+ FileReference fileRef, boolean createComponent) throws HyracksDataException, IndexException {
+ // Create new BTree instance.
+ LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) factory
+ .createLSMComponentInstance(new LSMComponentFileReferences(fileRef, null));
+ if (createComponent) {
+ component.getBTree().create();
+ }
+ // BTree will be closed during cleanup of merge().
+ component.getBTree().activate();
+ return component;
+ }
+
@Override
public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput) throws TreeIndexException {
return new LSMBTreeBulkLoader(fillLevel, verifyInput);
}
+ private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
+ LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
+ return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(), true);
+ }
+
+ @Override
+ public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
+ BTree btree = ((LSMBTreeImmutableComponent) lsmComponent).getBTree();
+ forceFlushDirtyPages(btree);
+ markAsValidInternal(btree);
+ }
+
public class LSMBTreeBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final BTreeBulkLoader bulkLoader;
@@ -419,8 +449,8 @@
} catch (IndexException e) {
throw new TreeIndexException(e);
}
- bulkLoader = (BTreeBulkLoader) ((LSMBTreeComponent) component).getBTree().createBulkLoader(fillFactor,
- verifyInput);
+ bulkLoader = (BTreeBulkLoader) ((LSMBTreeImmutableComponent) component).getBTree().createBulkLoader(
+ fillFactor, verifyInput);
}
@Override
@@ -440,8 +470,8 @@
}
protected void handleException() throws HyracksDataException {
- ((LSMBTreeComponent) component).getBTree().deactivate();
- ((LSMBTreeComponent) component).getBTree().destroy();
+ ((LSMBTreeImmutableComponent) component).getBTree().deactivate();
+ ((LSMBTreeImmutableComponent) component).getBTree().destroy();
}
@Override
@@ -452,40 +482,6 @@
}
- @Override
- public ITreeIndexFrameFactory getLeafFrameFactory() {
- return mutableComponent.getBTree().getLeafFrameFactory();
- }
-
- @Override
- public ITreeIndexFrameFactory getInteriorFrameFactory() {
- return mutableComponent.getBTree().getInteriorFrameFactory();
- }
-
- @Override
- public IFreePageManager getFreePageManager() {
- return mutableComponent.getBTree().getFreePageManager();
- }
-
- @Override
- public int getFieldCount() {
- return mutableComponent.getBTree().getFieldCount();
- }
-
- @Override
- public int getRootPageId() {
- return mutableComponent.getBTree().getRootPageId();
- }
-
- @Override
- public int getFileId() {
- return mutableComponent.getBTree().getFileId();
- }
-
- public IBinaryComparatorFactory[] getComparatorFactories() {
- return cmpFactories;
- }
-
public LSMBTreeOpContext createOpContext(IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback) {
return new LSMBTreeOpContext(mutableComponent.getBTree(), insertLeafFrameFactory, deleteLeafFrameFactory,
@@ -493,7 +489,7 @@
}
@Override
- public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+ public ILSMIndexAccessorInternal createAccessor(IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback) {
return new LSMBTreeAccessor(lsmHarness, createOpContext(modificationCallback, searchCallback));
}
@@ -512,51 +508,6 @@
LSMBTreeOpContext concreteCtx = (LSMBTreeOpContext) ctx;
return concreteCtx.cmp;
}
-
- @Override
- public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException {
- LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
- lsmHarness.getIOScheduler().scheduleOperation(
- new LSMFlushOperation((ILSMIndexAccessorInternal) createAccessor(null, null), componentFileRefs
- .getInsertIndexFileReference(), callback));
- }
- }
-
- public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
- LSMBTreeOpContext ctx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- ctx.setOperation(IndexOperation.MERGE);
- ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(ctx);
- RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
- // Ordered scan, ignoring the in-memory BTree.
- // We get back a snapshot of the on-disk BTrees that are going to be
- // merged now, so we can clean them up after the merge has completed.
- List<ILSMComponent> mergingDiskComponents;
- try {
- mergingDiskComponents = lsmHarness.search(cursor, (RangePredicate) rangePred, ctx, false);
- if (mergingDiskComponents.size() <= 1) {
- cursor.close();
- return null;
- }
- } catch (IndexException e) {
- throw new HyracksDataException(e);
- }
-
- BTree firstBTree = (BTree) ((LSMBTreeComponent) mergingDiskComponents.get(0)).getBTree();
- BTree lastBTree = (BTree) ((LSMBTreeComponent) mergingDiskComponents.get(mergingDiskComponents.size() - 1))
- .getBTree();
- FileReference firstFile = diskFileMapProvider.lookupFileName(firstBTree.getFileId());
- FileReference lastFile = diskFileMapProvider.lookupFileName(lastBTree.getFileId());
- LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFile.getFile()
- .getName(), lastFile.getFile().getName());
- return new LSMBTreeMergeOperation((ILSMIndexAccessorInternal) createAccessor(null, null),
- mergingDiskComponents, cursor, relMergeFileRefs.getInsertIndexFileReference(), callback);
- }
-
- @Override
- public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
- BTree btree = ((LSMBTreeComponent) lsmComponent).getBTree();
- forceFlushDirtyPages(btree);
- markAsValidInternal(btree);
}
@Override
@@ -564,19 +515,33 @@
return diskBufferCache;
}
- public boolean isEmptyIndex() throws HyracksDataException {
- return immutableComponents.isEmpty()
- && mutableComponent.getBTree().isEmptyTree(
- mutableComponent.getBTree().getInteriorFrameFactory().createFrame());
+ public IBinaryComparatorFactory[] getComparatorFactories() {
+ return cmpFactories;
}
@Override
- public void validate() throws HyracksDataException {
- mutableComponent.getBTree().validate();
- for (ILSMComponent c : immutableComponents) {
- BTree btree = (BTree) ((LSMBTreeComponent) c).getBTree();
- btree.validate();
- }
+ public ITreeIndexFrameFactory getInteriorFrameFactory() {
+ return mutableComponent.getBTree().getInteriorFrameFactory();
+ }
+
+ @Override
+ public int getFieldCount() {
+ return mutableComponent.getBTree().getFieldCount();
+ }
+
+ @Override
+ public int getFileId() {
+ return mutableComponent.getBTree().getFileId();
+ }
+
+ @Override
+ public IFreePageManager getFreePageManager() {
+ return mutableComponent.getBTree().getFreePageManager();
+ }
+
+ @Override
+ public ITreeIndexFrameFactory getLeafFrameFactory() {
+ return mutableComponent.getBTree().getLeafFrameFactory();
}
@Override
@@ -586,7 +551,24 @@
}
@Override
- public ILSMComponent getMutableComponent() {
- return mutableComponent;
+ public int getRootPageId() {
+ return mutableComponent.getBTree().getRootPageId();
+ }
+
+ public boolean isEmptyIndex() throws HyracksDataException {
+ List<ILSMComponent> immutableComponents = componentsRef.get();
+ return immutableComponents.isEmpty()
+ && mutableComponent.getBTree().isEmptyTree(
+ mutableComponent.getBTree().getInteriorFrameFactory().createFrame());
+ }
+
+ @Override
+ public void validate() throws HyracksDataException {
+ mutableComponent.getBTree().validate();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
+ for (ILSMComponent c : immutableComponents) {
+ BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
+ btree.validate();
+ }
}
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
new file mode 100644
index 0000000..2251a49
--- /dev/null
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponent.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractImmutableLSMComponent;
+
+public class LSMBTreeImmutableComponent extends AbstractImmutableLSMComponent {
+
+ private final BTree btree;
+
+ public LSMBTreeImmutableComponent(BTree btree) {
+ this.btree = btree;
+ }
+
+ @Override
+ public void destroy() throws HyracksDataException {
+ btree.deactivate();
+ btree.destroy();
+ }
+
+ public BTree getBTree() {
+ return btree;
+ }
+}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeComponentFactory.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
similarity index 84%
rename from hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeComponentFactory.java
rename to hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
index 23e0d83..696fc2c 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeComponentFactory.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeImmutableComponentFactory.java
@@ -23,16 +23,16 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-public class LSMBTreeComponentFactory implements ILSMComponentFactory {
+public class LSMBTreeImmutableComponentFactory implements ILSMComponentFactory {
private final TreeIndexFactory<BTree> btreeFactory;
- public LSMBTreeComponentFactory(TreeIndexFactory<BTree> btreeFactory) {
+ public LSMBTreeImmutableComponentFactory(TreeIndexFactory<BTree> btreeFactory) {
this.btreeFactory = btreeFactory;
}
@Override
public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException {
- return new LSMBTreeComponent(btreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()));
+ return new LSMBTreeImmutableComponent(btreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()));
}
@Override
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index 9e38fbf..a3a7097 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -51,7 +51,7 @@
public Set<IODeviceHandle> getReadDevices() {
Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
for (ILSMComponent o : mergingComponents) {
- LSMBTreeComponent component = (LSMBTreeComponent) o;
+ LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) o;
devs.add(component.getBTree().getFileReference().getDeviceHandle());
}
return devs;
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeComponent.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMutableComponent.java
similarity index 66%
rename from hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeComponent.java
rename to hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMutableComponent.java
index 5264f4f..30e79b4 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeComponent.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMutableComponent.java
@@ -17,31 +17,31 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMComponent;
+import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractMutableLSMComponent;
-public class LSMBTreeComponent extends AbstractLSMComponent {
+public class LSMBTreeMutableComponent extends AbstractMutableLSMComponent {
private final BTree btree;
+ private final IInMemoryFreePageManager mfpm;
- public LSMBTreeComponent(BTree btree) {
+ public LSMBTreeMutableComponent(BTree btree, IInMemoryFreePageManager mfpm) {
this.btree = btree;
- }
-
- @Override
- public void destroy() throws HyracksDataException {
- btree.deactivate();
- btree.destroy();
- }
-
- @Override
- public void reset() throws HyracksDataException {
- ((InMemoryFreePageManager) btree.getFreePageManager()).reset();
- btree.clear();
+ this.mfpm = mfpm;
}
public BTree getBTree() {
return btree;
}
+ @Override
+ protected boolean isFull() {
+ return mfpm.isFull();
+ }
+
+ @Override
+ protected void reset() throws HyracksDataException {
+ btree.clear();
+ }
+
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 9474354..a9c39f6 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -15,6 +15,9 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+import java.util.LinkedList;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
@@ -25,6 +28,7 @@
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
public final class LSMBTreeOpContext implements ILSMIndexOperationContext {
@@ -40,6 +44,7 @@
public final MultiComparator cmp;
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
+ private final List<ILSMComponent> componentHolder;
public LSMBTreeOpContext(BTree memBTree, ITreeIndexFrameFactory insertLeafFrameFactory,
ITreeIndexFrameFactory deleteLeafFrameFactory, IModificationOperationCallback modificationCallback,
@@ -61,18 +66,19 @@
if (deleteLeafFrame != null && this.cmp != null) {
deleteLeafFrame.setMultiComparator(cmp);
}
+ this.componentHolder = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
}
@Override
public void setOperation(IndexOperation newOp) {
+ reset();
this.op = newOp;
switch (newOp) {
case SEARCH:
setMemBTreeAccessor();
break;
-
case DISKORDERSCAN:
case UPDATE:
// Attention: It is important to leave the leafFrame and
@@ -82,12 +88,10 @@
// previously requested operation.
setMemBTreeAccessor();
return;
-
case UPSERT:
case INSERT:
setInsertMode();
break;
-
case PHYSICALDELETE:
case DELETE:
setDeleteMode();
@@ -117,6 +121,7 @@
@Override
public void reset() {
+ componentHolder.clear();
}
public IndexOperation getOperation() {
@@ -124,6 +129,11 @@
}
@Override
+ public List<ILSMComponent> getComponentHolder() {
+ return componentHolder;
+ }
+
+ @Override
public ISearchOperationCallback getSearchOperationCallback() {
return searchCallback;
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index 6d3f050..fa00f85 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -1,30 +1,10 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentState;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
public interface ILSMComponent {
- public void activate();
+ public boolean threadEnter(LSMOperationType opType) throws InterruptedException;
- public void deactivate();
-
- public void threadEnter();
-
- public void threadExit();
-
- public int getThreadReferenceCount();
-
- public void setState(LSMComponentState state);
-
- public boolean negativeCompareAndSet(LSMComponentState compare, LSMComponentState update);
-
- public LSMComponentState getState();
-
- // TODO: create two interfaces one for immutable and another for mutable components.
-
- // Only for immutable components.
- public void destroy() throws HyracksDataException;
-
- // Only for mutable components.
- public void reset() throws HyracksDataException;
+ public void threadExit(LSMOperationType opType, boolean failedOperation) throws HyracksDataException;
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 4805ee0..7b37c1a 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -15,8 +15,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
-import java.util.List;
-
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
@@ -24,30 +22,29 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public interface ILSMHarness {
- public boolean insertUpdateOrDelete(ITupleReference tuple, ILSMIndexOperationContext ictx, boolean tryOperation)
+ public boolean modify(ILSMIndexOperationContext ictx, boolean tryOperation, ITupleReference tuple)
throws HyracksDataException, IndexException;
- public boolean noOp(ILSMIndexOperationContext ictx, boolean tryOperation) throws HyracksDataException;
+ public void noOp(ILSMIndexOperationContext ctx) throws HyracksDataException;
- public List<ILSMComponent> search(IIndexCursor cursor, ISearchPredicate pred, ILSMIndexOperationContext ctx,
- boolean includeMutableComponent) throws HyracksDataException, IndexException;
+ public void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException;
- // Eventually includeMutableComponent and ctx should be removed.
- public void closeSearchCursor(List<ILSMComponent> operationalComponents, boolean includeMutableComponent,
- ILSMIndexOperationContext ctx) throws HyracksDataException;
+ public void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException;
- public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException;
+
+ public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException;
- public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException;
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException;
- public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
+ public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
+ IndexException;
public void addBulkLoadedComponent(ILSMComponent index) throws HyracksDataException, IndexException;
- public ILSMIndex getIndex();
-
public ILSMOperationTracker getOperationTracker();
-
- public ILSMIOOperationScheduler getIOScheduler();
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index aaa5879..cc1388f 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -16,6 +16,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
/**
@@ -27,7 +29,8 @@
* concurrent searches/updates/merges may be ongoing.
*/
public interface ILSMIndex extends IIndex {
- public void setFlushStatus(ILSMIndex index, boolean needsFlush);
+ public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+ ISearchOperationCallback searchCallback);
public boolean getFlushStatus(ILSMIndex index);
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 8ef03bf..f1c4c66 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -107,19 +107,8 @@
/**
* This method can be used to increase the number of 'active' operations of an index artificially,
* without actually modifying the index.
- * If the operation would have to wait for a flush then we return false.
- * Otherwise, beforeOperation() and afterOperation() of the ILSMOperationTracker are called,
- * and true is returned.
- *
- * @throws HyracksDataException
- */
- public boolean tryNoOp() throws HyracksDataException;
-
- /**
- * This method can be used to increase the number of 'active' operations of an index artificially,
- * without actually modifying the index.
- * This method may block waiting for a flush to finish, and will eventually call
- * beforeOperation() and afterOperation() of the ILSMOperationTracker.
+ * This method does not block and is guaranteed to trigger the {@link ILSMOperationTracker}'s beforeOperation
+ * and afterOperation calls.
*
* @throws HyracksDataException
*/
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
index 53a5c00..e98165b 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
@@ -22,39 +22,49 @@
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public interface ILSMIndexInternal extends ILSMIndex {
+ public ILSMIndexAccessorInternal createAccessor(IModificationOperationCallback modificationCallback,
+ ISearchOperationCallback searchCallback);
- public void insertUpdateOrDelete(ITupleReference tuple, IIndexOperationContext ictx) throws HyracksDataException,
- IndexException;
+ public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException, IndexException;
- public void search(IIndexCursor cursor, List<ILSMComponent> diskComponents, ISearchPredicate pred,
- IIndexOperationContext ictx, boolean includeMemComponent) throws HyracksDataException, IndexException;
+ public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException;
+
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException;
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
- public ILSMComponent merge(List<ILSMComponent> mergedComponents, ILSMIOOperation operation)
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException;
- public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
- IndexException;
+ public ILSMComponent merge(List<ILSMComponent> mergedComponents, ILSMIOOperation operation)
+ throws HyracksDataException, IndexException;
public void addComponent(ILSMComponent index);
public void subsumeMergedComponents(ILSMComponent newComponent, List<ILSMComponent> mergedComponents);
- public List<ILSMComponent> getOperationalComponents(IIndexOperationContext ctx);
+ /**
+ * Populates the context's component holder with a snapshot of the components involved in the operation.
+ *
+ * @param ctx
+ * - the operation's context
+ */
+ public void getOperationalComponents(ILSMIndexOperationContext ctx);
public IInMemoryFreePageManager getInMemoryFreePageManager();
- public void resetMutableComponent() throws HyracksDataException;
-
- public ILSMComponent getMutableComponent();
-
public List<ILSMComponent> getImmutableComponents();
public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException;
+ public void setFlushStatus(ILSMIndex index, boolean needsFlush);
+
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index 9230fe4..864d0e7 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -1,10 +1,15 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import java.util.List;
+
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
public interface ILSMIndexOperationContext extends IIndexOperationContext {
+ public List<ILSMComponent> getComponentHolder();
+
public ISearchOperationCallback getSearchOperationCallback();
+
public IModificationOperationCallback getModificationCallback();
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
index f07eaf9..c3f1f3e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
@@ -4,6 +4,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
/**
* This interface exposes methods for tracking and setting the status of operations for the purpose
@@ -21,8 +22,8 @@
* then this method does not block and returns false.
* Otherwise, this method returns true, and the operation is considered 'active' in the index.
*/
- public boolean beforeOperation(ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback, boolean tryOperation) throws HyracksDataException;
+ public void beforeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException;
/**
* An {@link ILSMIndex} will call this method after an operation has left the index,
@@ -30,7 +31,7 @@
* After this method has been called, the operation is still considered 'active',
* until the issuer of the operation declares it completed by calling completeOperation().
*/
- public void afterOperation(ISearchOperationCallback searchCallback,
+ public void afterOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException;
/**
@@ -38,6 +39,6 @@
* The use of this method indicates that the operation is no longer 'active'
* for the purpose of coordinating flushes/merges.
*/
- public void completeOperation(ISearchOperationCallback searchCallback,
+ public void completeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException;
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index 51f979f..baa9648 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -46,11 +46,7 @@
if (tupleFilter != null) {
frameTuple.reset(accessor, i);
if (!tupleFilter.accept(frameTuple)) {
- if (!lsmAccessor.tryNoOp()) {
- flushPartialFrame(lastFlushedTupleIndex, i);
- lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
- lsmAccessor.noOp();
- }
+ lsmAccessor.noOp();
continue;
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java
new file mode 100644
index 0000000..5382b81
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java
@@ -0,0 +1,68 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+
+public abstract class AbstractImmutableLSMComponent implements ILSMComponent {
+
+ private ComponentState state;
+ private int readerCount;
+
+ private enum ComponentState {
+ READABLE,
+ READABLE_MERGING,
+ KILLED
+ }
+
+ public AbstractImmutableLSMComponent() {
+ state = ComponentState.READABLE;
+ readerCount = 0;
+ }
+
+ @Override
+ public synchronized boolean threadEnter(LSMOperationType opType) {
+ if (state == ComponentState.KILLED) {
+ return false;
+ }
+
+ switch (opType) {
+ case MODIFICATION:
+ case SEARCH:
+ readerCount++;
+ break;
+ case MERGE:
+ if (state == ComponentState.READABLE_MERGING) {
+ return false;
+ }
+ state = ComponentState.READABLE_MERGING;
+ readerCount++;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported operation " + opType);
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized void threadExit(LSMOperationType opType, boolean failedOperation) throws HyracksDataException {
+ switch (opType) {
+ case MERGE:
+ if (failedOperation) {
+ state = ComponentState.READABLE;
+ }
+ case MODIFICATION:
+ case SEARCH:
+ readerCount--;
+ if (readerCount == 0 && state == ComponentState.READABLE_MERGING) {
+ destroy();
+ state = ComponentState.KILLED;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported operation " + opType);
+ }
+ }
+
+ protected abstract void destroy() throws HyracksDataException;
+
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
deleted file mode 100644
index 356f314..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-
-public abstract class AbstractLSMComponent implements ILSMComponent {
-
- private final AtomicInteger threadRef = new AtomicInteger();
- private LSMComponentState state;
-
- @Override
- public void activate() {
-
- }
-
- @Override
- public void deactivate() {
-
- }
-
- @Override
- public void threadEnter() {
- threadRef.incrementAndGet();
- }
-
- @Override
- public void threadExit() {
- threadRef.decrementAndGet();
- }
-
- @Override
- public int getThreadReferenceCount() {
- return threadRef.get();
- }
-
- @Override
- public void setState(LSMComponentState state) {
- synchronized (this) {
- this.state = state;
- }
- }
-
- @Override
- public LSMComponentState getState() {
- synchronized (this) {
- return state;
- }
- }
-
- @Override
- public boolean negativeCompareAndSet(LSMComponentState compare, LSMComponentState update) {
- synchronized (this) {
- if (state != compare) {
- state = update;
- return true;
- }
- }
- return false;
- }
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index da9da97..54ccd37 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -15,8 +15,10 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
@@ -39,6 +41,8 @@
public abstract class AbstractLSMIndex implements ILSMIndexInternal {
protected final ILSMHarness lsmHarness;
+ protected final ILSMIOOperationScheduler ioScheduler;
+
// In-memory components.
protected final IInMemoryFreePageManager memFreePageManager;
@@ -46,7 +50,7 @@
protected final IBufferCache diskBufferCache;
protected final ILSMIndexFileManager fileManager;
protected final IFileMapProvider diskFileMapProvider;
- protected final LinkedList<ILSMComponent> immutableComponents;
+ protected final AtomicReference<List<ILSMComponent>> componentsRef;
protected boolean isActivated;
@@ -58,11 +62,13 @@
this.memFreePageManager = memFreePageManager;
this.diskBufferCache = diskBufferCache;
this.diskFileMapProvider = diskFileMapProvider;
- this.immutableComponents = new LinkedList<ILSMComponent>();
this.fileManager = fileManager;
+ this.ioScheduler = ioScheduler;
ILSMOperationTracker opTracker = opTrackerFactory.createOperationTracker(this);
- lsmHarness = new LSMHarness(this, mergePolicy, opTracker, ioScheduler);
+ lsmHarness = new LSMHarness(this, mergePolicy, opTracker);
isActivated = false;
+ componentsRef = new AtomicReference<List<ILSMComponent>>();
+ componentsRef.set(new LinkedList<ILSMComponent>());
}
protected void forceFlushDirtyPages(ITreeIndex treeIndex) throws HyracksDataException {
@@ -119,15 +125,30 @@
}
@Override
- public void addComponent(ILSMComponent index) {
- immutableComponents.addFirst(index);
+ public void addComponent(ILSMComponent c) {
+ List<ILSMComponent> oldList = componentsRef.get();
+ List<ILSMComponent> newList = new ArrayList<ILSMComponent>();
+ newList.add(c);
+ for (ILSMComponent oc : oldList) {
+ newList.add(oc);
+ }
+ componentsRef.set(newList);
}
@Override
public void subsumeMergedComponents(ILSMComponent newComponent, List<ILSMComponent> mergedComponents) {
- int firstComponentIndex = immutableComponents.indexOf(mergedComponents.get(0));
- immutableComponents.removeAll(mergedComponents);
- immutableComponents.add(firstComponentIndex, newComponent);
+ List<ILSMComponent> oldList = componentsRef.get();
+ List<ILSMComponent> newList = new ArrayList<ILSMComponent>();
+ int swapIndex = oldList.indexOf(mergedComponents.get(0));
+ int swapSize = mergedComponents.size();
+ for (int i = 0; i < oldList.size(); i++) {
+ if (i < swapIndex || i >= swapIndex + swapSize) {
+ newList.add(oldList.get(i));
+ } else if (i == swapIndex) {
+ newList.add(newComponent);
+ }
+ }
+ componentsRef.set(newList);
}
@Override
@@ -137,7 +158,7 @@
@Override
public List<ILSMComponent> getImmutableComponents() {
- return immutableComponents;
+ return componentsRef.get();
}
@Override
@@ -157,7 +178,7 @@
@Override
public ILSMIOOperationScheduler getIOScheduler() {
- return lsmHarness.getIOScheduler();
+ return ioScheduler;
}
@Override
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
new file mode 100644
index 0000000..6575fea
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
@@ -0,0 +1,97 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+
+public abstract class AbstractMutableLSMComponent implements ILSMComponent {
+
+ private int readerCount;
+ private int writerCount;
+ private ComponentState state;
+
+ private enum ComponentState {
+ READABLE_WRITABLE,
+ READABLE_UNWRITABLE,
+ READABLE_UNWRITABLE_FLUSHING,
+ UNREADABLE_UNWRITABLE
+ }
+
+ public AbstractMutableLSMComponent() {
+ readerCount = 0;
+ writerCount = 0;
+ state = ComponentState.READABLE_WRITABLE;
+ }
+
+ @Override
+ public synchronized boolean threadEnter(LSMOperationType opType) throws InterruptedException {
+ switch (opType) {
+ case MODIFICATION:
+ while (state != ComponentState.READABLE_WRITABLE) {
+ return false;
+ }
+ writerCount++;
+ break;
+ case SEARCH:
+ while (state == ComponentState.UNREADABLE_UNWRITABLE) {
+ return false;
+ }
+ readerCount++;
+ break;
+ case FLUSH:
+ if (state == ComponentState.READABLE_UNWRITABLE_FLUSHING
+ || state == ComponentState.UNREADABLE_UNWRITABLE) {
+ return false;
+ }
+
+ state = ComponentState.READABLE_UNWRITABLE_FLUSHING;
+ while (writerCount > 0) {
+ wait();
+ }
+ readerCount++;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported operation " + opType);
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized void threadExit(LSMOperationType opType, boolean failedOperation) throws HyracksDataException {
+ switch (opType) {
+ case MODIFICATION:
+ writerCount--;
+ if (state == ComponentState.READABLE_WRITABLE && isFull()) {
+ state = ComponentState.READABLE_UNWRITABLE;
+ }
+ break;
+ case SEARCH:
+ readerCount--;
+ if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
+ reset();
+ state = ComponentState.READABLE_WRITABLE;
+ } else if (state == ComponentState.READABLE_WRITABLE && isFull()) {
+ state = ComponentState.READABLE_UNWRITABLE;
+ }
+ break;
+ case FLUSH:
+ if (failedOperation) {
+ state = isFull() ? ComponentState.READABLE_UNWRITABLE : ComponentState.READABLE_WRITABLE;
+ }
+ readerCount--;
+ if (readerCount == 0) {
+ reset();
+ state = ComponentState.READABLE_WRITABLE;
+ } else if (state == ComponentState.READABLE_UNWRITABLE_FLUSHING) {
+ state = ComponentState.UNREADABLE_UNWRITABLE;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported operation " + opType);
+ }
+ notifyAll();
+ }
+
+ protected abstract boolean isFull();
+
+ protected abstract void reset() throws HyracksDataException;
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
index 9cffb3c..6ce1f08 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
@@ -7,6 +7,7 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
@@ -14,12 +15,14 @@
public class LSMFlushOperation implements ILSMIOOperation {
private final ILSMIndexAccessorInternal accessor;
+ private final ILSMComponent flushingComponent;
private final FileReference flushTarget;
private final ILSMIOOperationCallback callback;
- public LSMFlushOperation(ILSMIndexAccessorInternal accessor, FileReference flushTarget,
- ILSMIOOperationCallback callback) {
+ public LSMFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
+ FileReference flushTarget, ILSMIOOperationCallback callback) {
this.accessor = accessor;
+ this.flushingComponent = flushingComponent;
this.flushTarget = flushTarget;
this.callback = callback;
}
@@ -48,4 +51,11 @@
return flushTarget;
}
+ public ILSMIndexAccessorInternal getAccessor() {
+ return accessor;
+ }
+
+ public ILSMComponent getFlushingComponent() {
+ return flushingComponent;
+ }
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index f4aa944..9759322 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -17,236 +17,208 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-/**
- * Common code for synchronizing LSM operations like
- * updates/searches/flushes/merges on any {@link ILSMIndex}. This class only deals with
- * synchronizing LSM operations, and delegates the concrete implementations of
- * actual operations to {@link ILSMIndex} (passed in the constructor).
- * Concurrency behavior:
- * All operations except merge (insert/update/delete/search) are blocked during a flush.
- * During a merge, all operations (except another merge) can proceed concurrently.
- * A merge and a flush can proceed concurrently.
- */
public class LSMHarness implements ILSMHarness {
- private final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
-
private final ILSMIndexInternal lsmIndex;
-
private final ILSMMergePolicy mergePolicy;
private final ILSMOperationTracker opTracker;
- private final ILSMIOOperationScheduler ioScheduler;
- public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
- ILSMIOOperationScheduler ioScheduler) {
+ public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker) {
this.lsmIndex = lsmIndex;
this.opTracker = opTracker;
this.mergePolicy = mergePolicy;
- this.ioScheduler = ioScheduler;
}
- private void threadExit(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ private void threadExit(ILSMIndexOperationContext opCtx, LSMOperationType opType) throws HyracksDataException {
if (!lsmIndex.getFlushStatus(lsmIndex) && lsmIndex.getInMemoryFreePageManager().isFull()) {
lsmIndex.setFlushStatus(lsmIndex, true);
}
- opTracker.afterOperation(opCtx.getSearchOperationCallback(), opCtx.getModificationCallback());
+ if (opTracker == null) {
+ System.out.println("break");
+ }
+ if (opCtx == null) {
+ System.out.println("break");
+ }
+ opTracker.afterOperation(opType, opCtx.getSearchOperationCallback(), opCtx.getModificationCallback());
+ }
+
+ private boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean tryOperation)
+ throws HyracksDataException {
+ int numEntered = 0;
+ boolean entranceSuccessful = false;
+ List<ILSMComponent> entered = new ArrayList<ILSMComponent>();
+
+ while (!entranceSuccessful) {
+ entered.clear();
+ lsmIndex.getOperationalComponents(ctx);
+ List<ILSMComponent> components = ctx.getComponentHolder();
+ try {
+ for (ILSMComponent c : components) {
+ if (!c.threadEnter(opType)) {
+ break;
+ }
+ numEntered++;
+ entered.add(c);
+ }
+ entranceSuccessful = numEntered == components.size();
+ } catch (InterruptedException e) {
+ entranceSuccessful = false;
+ throw new HyracksDataException(e);
+ } finally {
+ if (!entranceSuccessful) {
+ for (ILSMComponent c : components) {
+ if (numEntered <= 0) {
+ break;
+ }
+ c.threadExit(opType, true);
+ numEntered--;
+ }
+ }
+ }
+ if (tryOperation && !entranceSuccessful) {
+ return false;
+ }
+ }
+
+ opTracker.beforeOperation(opType, ctx.getSearchOperationCallback(), ctx.getModificationCallback());
+ return true;
+ }
+
+ private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean failedOperation)
+ throws HyracksDataException {
+ try {
+ for (ILSMComponent c : ctx.getComponentHolder()) {
+ c.threadExit(opType, failedOperation);
+ }
+ } finally {
+ threadExit(ctx, opType);
+ }
}
@Override
- public boolean insertUpdateOrDelete(ITupleReference tuple, ILSMIndexOperationContext ctx, boolean tryOperation)
+ public boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
throws HyracksDataException, IndexException {
- if (!opTracker.beforeOperation(ctx.getSearchOperationCallback(), ctx.getModificationCallback(), tryOperation)) {
+ LSMOperationType opType = LSMOperationType.MODIFICATION;
+ if (!getAndEnterComponents(ctx, opType, tryOperation)) {
return false;
}
try {
- lsmIndex.insertUpdateOrDelete(tuple, ctx);
+ lsmIndex.modify(ctx, tuple);
} finally {
- threadExit(ctx);
+ exitComponents(ctx, opType, false);
}
return true;
}
@Override
- public boolean noOp(ILSMIndexOperationContext ctx, boolean tryOperation) throws HyracksDataException {
- if (!opTracker.beforeOperation(ctx.getSearchOperationCallback(), ctx.getModificationCallback(), tryOperation)) {
- return false;
+ public void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException {
+ LSMOperationType opType = LSMOperationType.SEARCH;
+ getAndEnterComponents(ctx, opType, false);
+ try {
+ lsmIndex.search(ctx, cursor, pred);
+ } catch (HyracksDataException e) {
+ exitComponents(ctx, opType, true);
+ throw e;
+ } catch (IndexException e) {
+ exitComponents(ctx, opType, true);
+ throw e;
}
- threadExit(ctx);
- return true;
}
@Override
- public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Flushing LSM-Index: " + lsmIndex);
+ public void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ if (ctx.getOperation() == IndexOperation.SEARCH) {
+ exitComponents(ctx, LSMOperationType.SEARCH, false);
}
- ILSMComponent newComponent = null;
- try {
- operation.getCallback().beforeOperation(operation);
- newComponent = lsmIndex.flush(operation);
- operation.getCallback().afterOperation(operation, null, newComponent);
+ }
- // The implementation of this call must take any necessary steps to make
- // the new component permanent, and mark it as valid (usually this means
- // forcing all pages of the tree to disk, possibly with some extra
- // information to mark the tree as valid).
- lsmIndex.markAsValid(newComponent);
- } finally {
- operation.getCallback().afterFinalize(operation, newComponent);
- }
- lsmIndex.resetMutableComponent();
- synchronized (this) {
- lsmIndex.addComponent(newComponent);
- mergePolicy.diskComponentAdded(lsmIndex, lsmIndex.getImmutableComponents().size());
- }
+ @Override
+ public void noOp(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ LSMOperationType opType = LSMOperationType.NOOP;
+ opTracker.beforeOperation(opType, ctx.getSearchOperationCallback(), ctx.getModificationCallback());
+ threadExit(ctx, opType);
+ }
- // Unblock entering threads waiting for the flush
+ @Override
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
+ if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) {
+ return;
+ }
lsmIndex.setFlushStatus(lsmIndex, false);
+ lsmIndex.scheduleFlush(ctx, callback);
}
@Override
- public List<ILSMComponent> search(IIndexCursor cursor, ISearchPredicate pred, ILSMIndexOperationContext ctx,
- boolean includeMutableComponent) throws HyracksDataException, IndexException {
- // If the search doesn't include the in-memory component, then we don't have
- // to synchronize with a flush.
- if (includeMutableComponent) {
- opTracker.beforeOperation(ctx.getSearchOperationCallback(), ctx.getModificationCallback(), false);
- }
-
- // Get a snapshot of the current on-disk Trees.
- // If includeMutableComponent is true, then no concurrent
- // flush can add another on-disk Tree (due to threadEnter());
- // If includeMutableComponent is false, then it is possible that a concurrent
- // flush adds another on-disk Tree.
- // Since this mode is only used for merging trees, it doesn't really
- // matter if the merge excludes the new on-disk Tree.
- List<ILSMComponent> operationalComponents;
- synchronized (this) {
- operationalComponents = lsmIndex.getOperationalComponents(ctx);
- }
- for (ILSMComponent c : operationalComponents) {
- c.threadEnter();
- }
-
- lsmIndex.search(cursor, operationalComponents, pred, ctx, includeMutableComponent);
- return operationalComponents;
- }
-
- @Override
- public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
+ public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException {
- ILSMIOOperation mergeOp = lsmIndex.createMergeOperation(callback);
- return mergeOp;
+ operation.getCallback().beforeOperation(operation);
+ ILSMComponent newComponent = lsmIndex.flush(operation);
+ operation.getCallback().afterOperation(operation, null, newComponent);
+ lsmIndex.markAsValid(newComponent);
+ operation.getCallback().afterFinalize(operation, newComponent);
+
+ lsmIndex.addComponent(newComponent);
+ int numComponents = lsmIndex.getImmutableComponents().size();
+
+ mergePolicy.diskComponentAdded(lsmIndex, numComponents);
+ exitComponents(ctx, LSMOperationType.FLUSH, false);
}
@Override
- public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Merging LSM-Index: " + lsmIndex);
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ LSMOperationType opType = LSMOperationType.MERGE;
+ if (!getAndEnterComponents(ctx, opType, false)) {
+ return;
}
+ if (ctx.getComponentHolder().size() > 1) {
+ lsmIndex.scheduleMerge(ctx, callback);
+ } else {
+ exitComponents(ctx, opType, true);
+ }
+ }
+ @Override
+ public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
+ IndexException {
List<ILSMComponent> mergedComponents = new ArrayList<ILSMComponent>();
- ILSMComponent newComponent = null;
- try {
- operation.getCallback().beforeOperation(operation);
- newComponent = lsmIndex.merge(mergedComponents, operation);
- operation.getCallback().afterOperation(operation, mergedComponents, newComponent);
-
- // No merge happened.
- if (newComponent == null) {
- return;
- }
-
- // TODO: Move this to just before the merge cleanup and remove latching on disk components
- // The implementation of this call must take any necessary steps to make
- // the new component permanent, and mark it as valid (usually this means
- // forcing all pages of the tree to disk, possibly with some extra
- // information to mark the tree as valid).
- lsmIndex.markAsValid(newComponent);
- } finally {
- operation.getCallback().afterFinalize(operation, newComponent);
- }
-
- // Remove the old components from the list, and add the new merged component(s).
- try {
- synchronized (this) {
- lsmIndex.subsumeMergedComponents(newComponent, mergedComponents);
- }
- } finally {
- // Cleanup merged components in case there are no more searchers accessing them.
- for (ILSMComponent c : mergedComponents) {
- c.setState(LSMComponentState.DONE_MERGING);
- if (c.getThreadReferenceCount() == 0) {
- c.destroy();
- }
- }
- }
+ operation.getCallback().beforeOperation(operation);
+ ILSMComponent newComponent = lsmIndex.merge(mergedComponents, operation);
+ operation.getCallback().afterOperation(operation, mergedComponents, newComponent);
+ lsmIndex.markAsValid(newComponent);
+ operation.getCallback().afterFinalize(operation, newComponent);
+ lsmIndex.subsumeMergedComponents(newComponent, mergedComponents);
+ exitComponents(ctx, LSMOperationType.MERGE, false);
}
@Override
- public void closeSearchCursor(List<ILSMComponent> operationalComponents, boolean includeMutableComponent,
- ILSMIndexOperationContext ctx) throws HyracksDataException {
- // TODO: we should not worry about the mutable component.
- if (includeMutableComponent) {
- threadExit(ctx);
- }
- try {
- for (ILSMComponent c : operationalComponents) {
- c.threadExit();
- }
- } finally {
- for (ILSMComponent c : operationalComponents) {
- if (c.getState() == LSMComponentState.DONE_MERGING && c.getThreadReferenceCount() == 0) {
- c.destroy();
- }
- }
- }
- }
-
- @Override
- public void addBulkLoadedComponent(ILSMComponent index) throws HyracksDataException, IndexException {
- // The implementation of this call must take any necessary steps to make
- // the new component permanent, and mark it as valid (usually this means
- // forcing all pages of the tree to disk, possibly with some extra
- // information to mark the tree as valid).
- lsmIndex.markAsValid(index);
- synchronized (this) {
- lsmIndex.addComponent(index);
- mergePolicy.diskComponentAdded(lsmIndex, lsmIndex.getImmutableComponents().size());
- }
+ public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
+ lsmIndex.markAsValid(c);
+ lsmIndex.addComponent(c);
+ int numComponents = lsmIndex.getImmutableComponents().size();
+ mergePolicy.diskComponentAdded(lsmIndex, numComponents);
}
@Override
public ILSMOperationTracker getOperationTracker() {
return opTracker;
}
-
- @Override
- public ILSMIOOperationScheduler getIOScheduler() {
- return ioScheduler;
- }
-
- @Override
- public ILSMIndex getIndex() {
- return lsmIndex;
- }
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 3b66c72..92d3679 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -83,7 +83,7 @@
rangeCursors = null;
if (searcherRefCount != null) {
- lsmHarness.closeSearchCursor(operationalComponents, includeMemComponent, opCtx);
+ lsmHarness.endSearch(opCtx);
}
}
@@ -115,7 +115,7 @@
}
rangeCursors = null;
} finally {
- lsmHarness.closeSearchCursor(operationalComponents, includeMemComponent, opCtx);
+ lsmHarness.endSearch(opCtx);
}
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java
new file mode 100644
index 0000000..1a97cb2
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+public enum LSMOperationType {
+ SEARCH,
+ MODIFICATION,
+ FLUSH,
+ MERGE,
+ NOOP
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index d60c758..ef0e0e7 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -39,90 +39,90 @@
@Override
public void insert(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.INSERT);
- lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
+ lsmHarness.modify(ctx, false, tuple);
}
@Override
public void update(ITupleReference tuple) throws HyracksDataException, IndexException {
// Update is the same as insert.
ctx.setOperation(IndexOperation.UPDATE);
- lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
+ lsmHarness.modify(ctx, false, tuple);
}
@Override
public void delete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.DELETE);
- lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
+ lsmHarness.modify(ctx, false, tuple);
}
@Override
public void upsert(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.UPSERT);
- lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
+ lsmHarness.modify(ctx, false, tuple);
}
@Override
public boolean tryInsert(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.INSERT);
- return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
+ return lsmHarness.modify(ctx, true, tuple);
}
@Override
public boolean tryDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.DELETE);
- return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
+ return lsmHarness.modify(ctx, true, tuple);
}
@Override
public boolean tryUpdate(ITupleReference tuple) throws HyracksDataException, IndexException {
// Update is the same as insert.
ctx.setOperation(IndexOperation.UPDATE);
- return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
+ return lsmHarness.modify(ctx, true, tuple);
}
@Override
public boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.UPSERT);
- return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
+ return lsmHarness.modify(ctx, true, tuple);
}
@Override
public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.SEARCH);
- lsmHarness.search(cursor, searchPred, ctx, true);
+ lsmHarness.search(ctx, cursor, searchPred);
}
@Override
public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
- lsmHarness.flush(operation);
+ lsmHarness.flush(ctx, operation);
}
@Override
public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
- lsmHarness.merge(operation);
+ ctx.setOperation(IndexOperation.MERGE);
+ lsmHarness.merge(ctx, operation);
}
@Override
public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.PHYSICALDELETE);
- lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
+ lsmHarness.modify(ctx, false, tuple);
+ }
+
+ @Override
+ public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException {
+ ctx.setOperation(IndexOperation.FLUSH);
+ lsmHarness.scheduleFlush(ctx, callback);
}
@Override
public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
- ILSMIOOperation op = lsmHarness.createMergeOperation(callback);
- if (op != null) {
- lsmHarness.getIOScheduler().scheduleOperation(op);
- }
- }
-
- @Override
- public boolean tryNoOp() throws HyracksDataException {
- return lsmHarness.noOp(ctx, true);
+ ctx.setOperation(IndexOperation.MERGE);
+ lsmHarness.scheduleMerge(ctx, callback);
}
@Override
public void noOp() throws HyracksDataException {
- lsmHarness.noOp(ctx, false);
+ lsmHarness.noOp(ctx);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
index e4c584a..97ec50e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
@@ -4,7 +4,6 @@
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -24,21 +23,18 @@
return new ILSMOperationTracker() {
@Override
- public void completeOperation(ISearchOperationCallback searchCallback,
+ public void completeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
// Do nothing.
}
@Override
- public boolean beforeOperation(ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback, boolean tryOperation)
- throws HyracksDataException {
- // Do nothing.
- return true;
+ public void beforeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
}
@Override
- public void afterOperation(ISearchOperationCallback searchCallback,
+ public void afterOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
// Do nothing.
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
deleted file mode 100644
index a162fc1..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-
-public class ReferenceCountingOperationTracker implements ILSMOperationTracker {
-
- private int threadRefCount = 0;
- private final ILSMIndex index;
- private final FlushOperationCallback FLUSHCALLBACK_INSTANCE = new FlushOperationCallback();
-
- public ReferenceCountingOperationTracker(ILSMIndex index) {
- this.index = index;
- }
-
- @Override
- public synchronized boolean beforeOperation(ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback, boolean tryOperation) throws HyracksDataException {
- // Wait for pending flushes to complete.
- // If flushFlag is set, then the flush is queued to occur by the last exiting thread.
- // This operation should wait for that flush to occur before proceeding.
- if (index.getFlushStatus(index)) {
- if (tryOperation) {
- return false;
- }
- try {
- this.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- threadRefCount++;
- return true;
- }
-
- @Override
- public void afterOperation(ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) throws HyracksDataException {
- // The operation is considered inactive, immediately after leaving the index.
- completeOperation(searchCallback, modificationCallback);
- }
-
- @Override
- public synchronized void completeOperation(ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) throws HyracksDataException {
- threadRefCount--;
-
- // Flush will only be handled by last exiting thread.
- if (index.getFlushStatus(index) && threadRefCount == 0) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(FLUSHCALLBACK_INSTANCE);
- }
- }
-
- private class FlushOperationCallback implements ILSMIOOperationCallback {
- @Override
- public void beforeOperation(ILSMIOOperation operation) throws HyracksDataException {
- // Do nothing.
- }
-
- @Override
- public void afterOperation(ILSMIOOperation operation, List<ILSMComponent> oldComponents,
- ILSMComponent newComponent) throws HyracksDataException {
- // Do nothing.
- }
-
- @Override
- public void afterFinalize(ILSMIOOperation operation, ILSMComponent newComponent) throws HyracksDataException {
- ReferenceCountingOperationTracker.this.notifyAll();
- }
- }
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerFactory.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
similarity index 60%
rename from hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerFactory.java
rename to hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
index aa4e551..3b4b00f 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerFactory.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
@@ -4,18 +4,18 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
-public class RefCountingOperationTrackerFactory implements ILSMOperationTrackerFactory {
+public class ThreadCountingOperationTrackerFactory implements ILSMOperationTrackerFactory {
private static final long serialVersionUID = 1L;
- public static RefCountingOperationTrackerFactory INSTANCE = new RefCountingOperationTrackerFactory();
+ public static ThreadCountingOperationTrackerFactory INSTANCE = new ThreadCountingOperationTrackerFactory();
@Override
public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
- return new ReferenceCountingOperationTracker(index);
+ return new ThreadCountingTracker(index);
}
// Enforce singleton.
- private RefCountingOperationTrackerFactory() {
+ private ThreadCountingOperationTrackerFactory() {
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
new file mode 100644
index 0000000..7fee06e
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public class ThreadCountingTracker implements ILSMOperationTracker {
+ private final AtomicInteger threadRefCount;
+ private final ILSMIndex index;
+
+ public ThreadCountingTracker(ILSMIndex index) {
+ this.index = index;
+ this.threadRefCount = new AtomicInteger();
+ }
+
+ @Override
+ public void beforeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ if (opType == LSMOperationType.MODIFICATION) {
+ threadRefCount.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void afterOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ // The operation is considered inactive, immediately after leaving the index.
+ completeOperation(opType, searchCallback, modificationCallback);
+ }
+
+ @Override
+ public void completeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ // Flush will only be handled by last exiting thread.
+ if (opType == LSMOperationType.MODIFICATION) {
+ if (threadRefCount.decrementAndGet() == 0 && index.getFlushStatus(index)) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(NoOpIOOperationCallback.INSTANCE);
+ }
+ }
+ }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 0da39d8..b9834b0 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -54,6 +54,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
@@ -61,7 +62,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentState;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
@@ -77,7 +77,7 @@
public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex {
// In-memory components.
- protected final LSMInvertedIndexComponent mutableComponent;
+ protected final LSMInvertedIndexMutableComponent mutableComponent;
protected final IInMemoryFreePageManager memFreePageManager;
protected final IBinaryTokenizerFactory tokenizerFactory;
@@ -111,16 +111,10 @@
BTree deleteKeysBTree = BTreeUtils.createBTree(memBufferCache, memFreePageManager,
((InMemoryBufferCache) memBufferCache).getFileMapProvider(), invListTypeTraits, invListCmpFactories,
BTreeLeafFrameType.REGULAR_NSM, new FileReference(new File("membtree")));
- mutableComponent = new LSMInvertedIndexComponent(memInvIndex, deleteKeysBTree);
+ mutableComponent = new LSMInvertedIndexMutableComponent(memInvIndex, deleteKeysBTree, memFreePageManager);
componentFactory = new LSMInvertedIndexComponentFactory(diskInvIndexFactory, deletedKeysBTreeFactory);
}
- protected InMemoryInvertedIndex createInMemoryInvertedIndex(IInMemoryBufferCache memBufferCache)
- throws IndexException {
- return InvertedIndexUtils.createInMemoryBTreeInvertedindex(memBufferCache, memFreePageManager,
- invListTypeTraits, invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory);
- }
-
@Override
public synchronized void create() throws HyracksDataException {
if (isActivated) {
@@ -129,7 +123,7 @@
fileManager.deleteDirs();
fileManager.createDirs();
- immutableComponents.clear();
+ componentsRef.get().clear();
}
@Override
@@ -138,6 +132,7 @@
return;
}
try {
+ List<ILSMComponent> immutableComponents = componentsRef.get();
((InMemoryBufferCache) mutableComponent.getInvIndex().getBufferCache()).open();
mutableComponent.getInvIndex().create();
mutableComponent.getInvIndex().activate();
@@ -146,7 +141,7 @@
immutableComponents.clear();
List<LSMComponentFileReferences> validFileReferences = fileManager.cleanupAndGetValidFiles();
for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
- LSMInvertedIndexComponent component;
+ LSMInvertedIndexImmutableComponent component;
try {
component = createDiskInvIndexComponent(componentFactory,
lsmComonentFileReference.getInsertIndexFileReference(),
@@ -164,52 +159,15 @@
}
@Override
- public List<ILSMComponent> getOperationalComponents(IIndexOperationContext ctx) {
- List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
- switch (ctx.getOperation()) {
- case SEARCH:
- // TODO: We should add the mutable component at some point.
- operationalComponents.addAll(immutableComponents);
- break;
- case MERGE:
- // TODO: determining the participating components in a merge should probably the task of the merge policy.
- if (immutableComponents.size() > 1) {
- for (ILSMComponent c : immutableComponents) {
- if (c.negativeCompareAndSet(LSMComponentState.MERGING, LSMComponentState.MERGING)) {
- operationalComponents.add(c);
- }
- }
- }
- break;
- default:
- throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
- }
- return operationalComponents;
- }
-
- protected LSMInvertedIndexComponent createDiskInvIndexComponent(ILSMComponentFactory factory,
- FileReference dictBTreeFileRef, FileReference btreeFileRef, boolean create) throws HyracksDataException,
- IndexException {
- LSMInvertedIndexComponent component = (LSMInvertedIndexComponent) factory
- .createLSMComponentInstance(new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef));
- if (create) {
- component.getInvIndex().create();
- component.getDeletedKeysBTree().create();
- }
- // Will be closed during cleanup of merge().
- component.getInvIndex().activate();
- component.getDeletedKeysBTree().activate();
- return component;
- }
-
- @Override
public void clear() throws HyracksDataException {
if (!isActivated) {
throw new HyracksDataException("Failed to clear the index since it is not activated.");
}
- resetMutableComponent();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
+ mutableComponent.getInvIndex().clear();
+ mutableComponent.getDeletedKeysBTree().clear();
for (ILSMComponent c : immutableComponents) {
- LSMInvertedIndexComponent component = (LSMInvertedIndexComponent) c;
+ LSMInvertedIndexImmutableComponent component = (LSMInvertedIndexImmutableComponent) c;
component.getInvIndex().deactivate();
component.getDeletedKeysBTree().deactivate();
component.getInvIndex().destroy();
@@ -236,8 +194,9 @@
throw new HyracksDataException(e);
}
+ List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- LSMInvertedIndexComponent component = (LSMInvertedIndexComponent) c;
+ LSMInvertedIndexImmutableComponent component = (LSMInvertedIndexImmutableComponent) c;
component.getInvIndex().deactivate();
component.getDeletedKeysBTree().deactivate();
}
@@ -256,8 +215,9 @@
mutableComponent.getInvIndex().destroy();
mutableComponent.getDeletedKeysBTree().destroy();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- LSMInvertedIndexComponent component = (LSMInvertedIndexComponent) c;
+ LSMInvertedIndexImmutableComponent component = (LSMInvertedIndexImmutableComponent) c;
component.getInvIndex().destroy();
component.getDeletedKeysBTree().destroy();
}
@@ -265,34 +225,28 @@
}
@Override
- public void validate() throws HyracksDataException {
- mutableComponent.getInvIndex().validate();
- mutableComponent.getDeletedKeysBTree().validate();
- for (ILSMComponent c : immutableComponents) {
- LSMInvertedIndexComponent component = (LSMInvertedIndexComponent) c;
- component.getInvIndex().validate();
- component.getDeletedKeysBTree().validate();
+ public void getOperationalComponents(ILSMIndexOperationContext ctx) {
+ List<ILSMComponent> immutableComponents = componentsRef.get();
+ List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
+ operationalComponents.clear();
+ switch (ctx.getOperation()) {
+ case FLUSH:
+ case DELETE:
+ case INSERT:
+ operationalComponents.add(mutableComponent);
+ break;
+ case SEARCH:
+ operationalComponents.add(mutableComponent);
+ operationalComponents.addAll(immutableComponents);
+ break;
+ case MERGE:
+ operationalComponents.addAll(immutableComponents);
+ break;
+ default:
+ throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
}
- @Override
- public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
- ISearchOperationCallback searchCallback) {
- return new LSMInvertedIndexAccessor(this, lsmHarness, fileManager, createOpContext(modificationCallback,
- searchCallback));
- }
-
- private LSMInvertedIndexOpContext createOpContext(IModificationOperationCallback modificationCallback,
- ISearchOperationCallback searchCallback) {
- return new LSMInvertedIndexOpContext(mutableComponent.getInvIndex(), mutableComponent.getDeletedKeysBTree(),
- modificationCallback, searchCallback);
- }
-
- @Override
- public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
- return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput);
- }
-
/**
* The keys in the in-memory deleted-keys BTree only refer to on-disk components.
* We delete documents from the in-memory inverted index by deleting its entries directly,
@@ -308,8 +262,7 @@
* - Insert key into deleted-keys BTree.
*/
@Override
- public void insertUpdateOrDelete(ITupleReference tuple, IIndexOperationContext ictx) throws HyracksDataException,
- IndexException {
+ public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException, IndexException {
LSMInvertedIndexOpContext ctx = (LSMInvertedIndexOpContext) ictx;
ctx.modificationCallback.before(tuple);
switch (ctx.getOperation()) {
@@ -338,9 +291,12 @@
}
@Override
- public void search(IIndexCursor cursor, List<ILSMComponent> immutableComponents, ISearchPredicate pred,
- IIndexOperationContext ictx, boolean includeMutableComponent) throws HyracksDataException, IndexException {
- int numComponents = (includeMutableComponent) ? immutableComponents.size() : immutableComponents.size() + 1;
+ public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException {
+ List<ILSMComponent> operationalComponents = ictx.getComponentHolder();
+ int numComponents = operationalComponents.size();
+ assert numComponents > 0;
+ boolean includeMutableComponent = operationalComponents.get(0) == mutableComponent;
ArrayList<IIndexAccessor> indexAccessors = new ArrayList<IIndexAccessor>(numComponents);
ArrayList<IIndexAccessor> deletedKeysBTreeAccessors = new ArrayList<IIndexAccessor>(numComponents);
if (includeMutableComponent) {
@@ -351,8 +307,10 @@
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
deletedKeysBTreeAccessors.add(deletedKeysAccessor);
}
- for (int i = 0; i < immutableComponents.size(); i++) {
- LSMInvertedIndexComponent component = (LSMInvertedIndexComponent) immutableComponents.get(i);
+
+ for (int i = includeMutableComponent ? 1 : 0; i < operationalComponents.size(); i++) {
+ LSMInvertedIndexImmutableComponent component = (LSMInvertedIndexImmutableComponent) operationalComponents
+ .get(i);
IIndexAccessor invIndexAccessor = component.getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
indexAccessors.add(invIndexAccessor);
@@ -369,12 +327,13 @@
private ICursorInitialState createCursorInitialState(ISearchPredicate pred, IIndexOperationContext ictx,
boolean includeMutableComponent, ArrayList<IIndexAccessor> indexAccessors,
ArrayList<IIndexAccessor> deletedKeysBTreeAccessors) {
+ List<ILSMComponent> immutableComponents = componentsRef.get();
ICursorInitialState initState = null;
PermutingTupleReference keysOnlyTuple = createKeysOnlyTupleReference();
MultiComparator keyCmp = MultiComparator.createIgnoreFieldLength(invListCmpFactories);
List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
if (includeMutableComponent) {
- operationalComponents.add(getMutableComponent());
+ operationalComponents.add(mutableComponent);
}
operationalComponents.addAll(immutableComponents);
@@ -407,68 +366,17 @@
}
@Override
- public ILSMComponent merge(List<ILSMComponent> mergedComponents, ILSMIOOperation operation)
- throws HyracksDataException, IndexException {
- LSMInvertedIndexMergeOperation mergeOp = (LSMInvertedIndexMergeOperation) operation;
-
- // Create an inverted index instance.
- LSMInvertedIndexComponent component = createDiskInvIndexComponent(componentFactory,
- mergeOp.getDictBTreeMergeTarget(), mergeOp.getDeletedKeysBTreeMergeTarget(), true);
-
- IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
- IIndexCursor cursor = mergeOp.getCursor();
- IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true);
- try {
- while (cursor.hasNext()) {
- cursor.next();
- ITupleReference tuple = cursor.getTuple();
- invIndexBulkLoader.add(tuple);
- }
- } finally {
- cursor.close();
- }
- invIndexBulkLoader.end();
-
- // Create an empty deleted keys BTree (do nothing with the returned index).
- BTree deletedKeysBTree = component.getDeletedKeysBTree();
-
- // Add the merged components for cleanup.
- mergedComponents.addAll(mergeOp.getMergingComponents());
-
- return new LSMInvertedIndexComponent(mergedDiskInvertedIndex, deletedKeysBTree);
- }
-
- @Override
- public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
- IndexException {
- LSMInvertedIndexOpContext ctx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- ctx.setOperation(IndexOperation.MERGE);
- IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(ctx);
- RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null);
-
- // Scan diskInvertedIndexes ignoring the memoryInvertedIndex.
- List<ILSMComponent> mergingComponents = lsmHarness.search(cursor, mergePred, ctx, false);
- if (mergingComponents.size() <= 1) {
- cursor.close();
- return null;
- }
-
- LSMInvertedIndexComponent firstComponent = (LSMInvertedIndexComponent) mergingComponents.get(0);
- OnDiskInvertedIndex firstInvIndex = (OnDiskInvertedIndex) firstComponent.getInvIndex();
- String firstFileName = firstInvIndex.getBTree().getFileReference().getFile().getName();
-
- LSMInvertedIndexComponent lastComponent = (LSMInvertedIndexComponent) mergingComponents.get(mergingComponents
- .size() - 1);
- OnDiskInvertedIndex lastInvIndex = (OnDiskInvertedIndex) lastComponent.getInvIndex();
- String lastFileName = lastInvIndex.getBTree().getFileReference().getFile().getName();
-
- LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFileName, lastFileName);
- LSMInvertedIndexMergeOperation mergeOp = new LSMInvertedIndexMergeOperation(
- (ILSMIndexAccessorInternal) createAccessor(null, null), mergingComponents, cursor,
- relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
- callback);
-
- return mergeOp;
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
+ LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
+ LSMInvertedIndexOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
+ opCtx.setOperation(IndexOperation.FLUSH);
+ opCtx.getComponentHolder().add(flushingComponent);
+ ioScheduler.scheduleOperation(new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(this, lsmHarness,
+ fileManager, opCtx), mutableComponent, componentFileRefs.getInsertIndexFileReference(),
+ componentFileRefs.getDeleteIndexFileReference(), callback));
}
@Override
@@ -476,12 +384,13 @@
LSMInvertedIndexFlushOperation flushOp = (LSMInvertedIndexFlushOperation) operation;
// Create an inverted index instance to be bulk loaded.
- LSMInvertedIndexComponent component = createDiskInvIndexComponent(componentFactory,
+ LSMInvertedIndexImmutableComponent component = createDiskInvIndexComponent(componentFactory,
flushOp.getDictBTreeFlushTarget(), flushOp.getDeletedKeysBTreeFlushTarget(), true);
IInvertedIndex diskInvertedIndex = component.getInvIndex();
// Create a scan cursor on the BTree underlying the in-memory inverted index.
- InMemoryInvertedIndexAccessor memInvIndexAccessor = (InMemoryInvertedIndexAccessor) mutableComponent
+ LSMInvertedIndexMutableComponent flushingComponent = flushOp.getFlushingComponent();
+ InMemoryInvertedIndexAccessor memInvIndexAccessor = (InMemoryInvertedIndexAccessor) flushingComponent
.getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
BTreeAccessor memBTreeAccessor = memInvIndexAccessor.getBTreeAccessor();
RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
@@ -504,7 +413,7 @@
BTree diskDeletedKeysBTree = component.getDeletedKeysBTree();
// Create a scan cursor on the deleted keys BTree underlying the in-memory inverted index.
- IIndexAccessor deletedKeysBTreeAccessor = mutableComponent.getDeletedKeysBTree().createAccessor(
+ IIndexAccessor deletedKeysBTreeAccessor = flushingComponent.getDeletedKeysBTree().createAccessor(
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
IIndexCursor deletedKeysScanCursor = deletedKeysBTreeAccessor.createSearchCursor();
deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred);
@@ -521,7 +430,69 @@
}
deletedKeysBTreeBulkLoader.end();
- return new LSMInvertedIndexComponent(diskInvertedIndex, diskDeletedKeysBTree);
+ return new LSMInvertedIndexImmutableComponent(diskInvertedIndex, diskDeletedKeysBTree);
+ }
+
+ @Override
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ LSMInvertedIndexOpContext ictx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
+ ictx.getComponentHolder().addAll(mergingComponents);
+ IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(ictx);
+ RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null);
+
+ // Scan diskInvertedIndexes ignoring the memoryInvertedIndex.
+ search(ictx, cursor, mergePred);
+
+ ictx.setOperation(IndexOperation.MERGE);
+ LSMInvertedIndexImmutableComponent firstComponent = (LSMInvertedIndexImmutableComponent) mergingComponents
+ .get(0);
+ OnDiskInvertedIndex firstInvIndex = (OnDiskInvertedIndex) firstComponent.getInvIndex();
+ String firstFileName = firstInvIndex.getBTree().getFileReference().getFile().getName();
+
+ LSMInvertedIndexImmutableComponent lastComponent = (LSMInvertedIndexImmutableComponent) mergingComponents
+ .get(mergingComponents.size() - 1);
+ OnDiskInvertedIndex lastInvIndex = (OnDiskInvertedIndex) lastComponent.getInvIndex();
+ String lastFileName = lastInvIndex.getBTree().getFileReference().getFile().getName();
+
+ LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFileName, lastFileName);
+ ILSMIndexAccessorInternal accessor = new LSMInvertedIndexAccessor(this, lsmHarness, fileManager, ictx);
+ ioScheduler.scheduleOperation(new LSMInvertedIndexMergeOperation(accessor, mergingComponents, cursor,
+ relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
+ callback));
+ }
+
+ @Override
+ public ILSMComponent merge(List<ILSMComponent> mergedComponents, ILSMIOOperation operation)
+ throws HyracksDataException, IndexException {
+ LSMInvertedIndexMergeOperation mergeOp = (LSMInvertedIndexMergeOperation) operation;
+
+ // Create an inverted index instance.
+ LSMInvertedIndexImmutableComponent component = createDiskInvIndexComponent(componentFactory,
+ mergeOp.getDictBTreeMergeTarget(), mergeOp.getDeletedKeysBTreeMergeTarget(), true);
+
+ IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
+ IIndexCursor cursor = mergeOp.getCursor();
+ IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true);
+ try {
+ while (cursor.hasNext()) {
+ cursor.next();
+ ITupleReference tuple = cursor.getTuple();
+ invIndexBulkLoader.add(tuple);
+ }
+ } finally {
+ cursor.close();
+ }
+ invIndexBulkLoader.end();
+
+ // Create an empty deleted keys BTree (do nothing with the returned index).
+ BTree deletedKeysBTree = component.getDeletedKeysBTree();
+
+ // Add the merged components for cleanup.
+ mergedComponents.addAll(mergeOp.getMergingComponents());
+
+ return new LSMInvertedIndexImmutableComponent(mergedDiskInvertedIndex, deletedKeysBTree);
}
private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
@@ -530,6 +501,11 @@
componentFileRefs.getDeleteIndexFileReference(), true);
}
+ @Override
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+ return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput);
+ }
+
public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final IIndexBulkLoader invIndexBulkLoader;
@@ -544,8 +520,8 @@
} catch (IndexException e) {
throw new TreeIndexException(e);
}
- invIndexBulkLoader = ((LSMInvertedIndexComponent) component).getInvIndex().createBulkLoader(fillFactor,
- verifyInput);
+ invIndexBulkLoader = ((LSMInvertedIndexImmutableComponent) component).getInvIndex().createBulkLoader(
+ fillFactor, verifyInput);
}
@Override
@@ -565,10 +541,10 @@
}
protected void handleException() throws HyracksDataException {
- ((LSMInvertedIndexComponent) component).getInvIndex().deactivate();
- ((LSMInvertedIndexComponent) component).getInvIndex().destroy();
- ((LSMInvertedIndexComponent) component).getDeletedKeysBTree().deactivate();
- ((LSMInvertedIndexComponent) component).getDeletedKeysBTree().destroy();
+ ((LSMInvertedIndexImmutableComponent) component).getInvIndex().deactivate();
+ ((LSMInvertedIndexImmutableComponent) component).getInvIndex().destroy();
+ ((LSMInvertedIndexImmutableComponent) component).getDeletedKeysBTree().deactivate();
+ ((LSMInvertedIndexImmutableComponent) component).getDeletedKeysBTree().destroy();
}
@Override
@@ -578,17 +554,38 @@
}
}
- @Override
- public void resetMutableComponent() throws HyracksDataException {
- memFreePageManager.reset();
- mutableComponent.getInvIndex().clear();
- mutableComponent.getDeletedKeysBTree().clear();
+ protected InMemoryInvertedIndex createInMemoryInvertedIndex(IInMemoryBufferCache memBufferCache)
+ throws IndexException {
+ return InvertedIndexUtils.createInMemoryBTreeInvertedindex(memBufferCache, memFreePageManager,
+ invListTypeTraits, invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory);
+ }
+
+ protected LSMInvertedIndexImmutableComponent createDiskInvIndexComponent(ILSMComponentFactory factory,
+ FileReference dictBTreeFileRef, FileReference btreeFileRef, boolean create) throws HyracksDataException,
+ IndexException {
+ LSMInvertedIndexImmutableComponent component = (LSMInvertedIndexImmutableComponent) factory
+ .createLSMComponentInstance(new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef));
+ if (create) {
+ component.getInvIndex().create();
+ component.getDeletedKeysBTree().create();
+ }
+ // Will be closed during cleanup of merge().
+ component.getInvIndex().activate();
+ component.getDeletedKeysBTree().activate();
+ return component;
}
@Override
- public long getMemoryAllocationSize() {
- InMemoryBufferCache memBufferCache = (InMemoryBufferCache) mutableComponent.getInvIndex().getBufferCache();
- return memBufferCache.getNumPages() * memBufferCache.getPageSize();
+ public ILSMIndexAccessorInternal createAccessor(IModificationOperationCallback modificationCallback,
+ ISearchOperationCallback searchCallback) {
+ return new LSMInvertedIndexAccessor(this, lsmHarness, fileManager, createOpContext(modificationCallback,
+ searchCallback));
+ }
+
+ private LSMInvertedIndexOpContext createOpContext(IModificationOperationCallback modificationCallback,
+ ISearchOperationCallback searchCallback) {
+ return new LSMInvertedIndexOpContext(mutableComponent.getInvIndex(), mutableComponent.getDeletedKeysBTree(),
+ modificationCallback, searchCallback);
}
@Override
@@ -613,6 +610,12 @@
}
@Override
+ public long getMemoryAllocationSize() {
+ InMemoryBufferCache memBufferCache = (InMemoryBufferCache) mutableComponent.getInvIndex().getBufferCache();
+ return memBufferCache.getNumPages() * memBufferCache.getPageSize();
+ }
+
+ @Override
public ITypeTraits[] getTokenTypeTraits() {
return tokenTypeTraits;
}
@@ -626,9 +629,17 @@
return tokenizerFactory;
}
+ protected void forceFlushInvListsFileDirtyPages(OnDiskInvertedIndex invIndex) throws HyracksDataException {
+ int fileId = invIndex.getInvListsFileId();
+ IBufferCache bufferCache = invIndex.getBufferCache();
+ int startPageId = 0;
+ int maxPageId = invIndex.getInvListsMaxPageId();
+ forceFlushDirtyPages(bufferCache, fileId, startPageId, maxPageId);
+ }
+
@Override
public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
- LSMInvertedIndexComponent invIndexComponent = (LSMInvertedIndexComponent) lsmComponent;
+ LSMInvertedIndexImmutableComponent invIndexComponent = (LSMInvertedIndexImmutableComponent) lsmComponent;
OnDiskInvertedIndex invIndex = (OnDiskInvertedIndex) invIndexComponent.getInvIndex();
ITreeIndex treeIndex = invIndex.getBTree();
// Flush inverted index first.
@@ -640,16 +651,15 @@
markAsValidInternal(treeIndex);
}
- protected void forceFlushInvListsFileDirtyPages(OnDiskInvertedIndex invIndex) throws HyracksDataException {
- int fileId = invIndex.getInvListsFileId();
- IBufferCache bufferCache = invIndex.getBufferCache();
- int startPageId = 0;
- int maxPageId = invIndex.getInvListsMaxPageId();
- forceFlushDirtyPages(bufferCache, fileId, startPageId, maxPageId);
- }
-
@Override
- public ILSMComponent getMutableComponent() {
- return mutableComponent;
+ public void validate() throws HyracksDataException {
+ mutableComponent.getInvIndex().validate();
+ mutableComponent.getDeletedKeysBTree().validate();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
+ for (ILSMComponent c : immutableComponents) {
+ LSMInvertedIndexImmutableComponent component = (LSMInvertedIndexImmutableComponent) c;
+ component.getInvIndex().validate();
+ component.getDeletedKeysBTree().validate();
+ }
}
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 7faed0f..6ba6c09 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
@@ -49,30 +48,30 @@
@Override
public void insert(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.INSERT);
- lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
+ lsmHarness.modify(ctx, false, tuple);
}
@Override
public void delete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.DELETE);
- lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
+ lsmHarness.modify(ctx, false, tuple);
}
@Override
public boolean tryInsert(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.INSERT);
- return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
+ return lsmHarness.modify(ctx, true, tuple);
}
@Override
public boolean tryDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.DELETE);
- return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
+ return lsmHarness.modify(ctx, true, tuple);
}
public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.SEARCH);
- lsmHarness.search(cursor, searchPred, ctx, true);
+ lsmHarness.search(ctx, cursor, searchPred);
}
public IIndexCursor createSearchCursor() {
@@ -81,29 +80,24 @@
@Override
public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException {
- LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
- lsmHarness.getIOScheduler().scheduleOperation(
- new LSMInvertedIndexFlushOperation((ILSMIndexAccessorInternal) invIndex.createAccessor(null, null),
- componentFileRefs.getInsertIndexFileReference(), componentFileRefs
- .getDeleteIndexFileReference(), callback));
+ ctx.setOperation(IndexOperation.FLUSH);
+ lsmHarness.scheduleFlush(ctx, callback);
}
@Override
public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
- lsmHarness.flush(operation);
+ lsmHarness.flush(ctx, operation);
}
@Override
public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
- ILSMIOOperation op = lsmHarness.createMergeOperation(callback);
- if (op != null) {
- lsmHarness.getIOScheduler().scheduleOperation(op);
- }
+ ctx.setOperation(IndexOperation.MERGE);
+ lsmHarness.scheduleMerge(ctx, callback);
}
@Override
public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
- lsmHarness.merge(operation);
+ lsmHarness.merge(ctx, operation);
}
@Override
@@ -118,13 +112,8 @@
}
@Override
- public boolean tryNoOp() throws HyracksDataException {
- return lsmHarness.noOp(ctx, true);
- }
-
- @Override
public void noOp() throws HyracksDataException {
- lsmHarness.noOp(ctx, false);
+ lsmHarness.noOp(ctx);
}
@Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexComponentFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexComponentFactory.java
index 23fc0ae..f856f90 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexComponentFactory.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexComponentFactory.java
@@ -36,9 +36,8 @@
@Override
public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException {
- return new LSMInvertedIndexComponent(
- diskInvIndexFactory.createIndexInstance(cfr.getInsertIndexFileReference()),
- btreeFactory.createIndexInstance(cfr.getDeleteIndexFileReference()));
+ return new LSMInvertedIndexImmutableComponent(diskInvIndexFactory.createIndexInstance(cfr
+ .getInsertIndexFileReference()), btreeFactory.createIndexInstance(cfr.getDeleteIndexFileReference()));
}
@Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
index d10a056..fb55ce0 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
@@ -28,13 +28,16 @@
public class LSMInvertedIndexFlushOperation implements ILSMIOOperation {
private final ILSMIndexAccessorInternal accessor;
+ private final LSMInvertedIndexMutableComponent flushingComponent;
private final FileReference dictBTreeFlushTarget;
private final FileReference deletedKeysBTreeFlushTarget;
private final ILSMIOOperationCallback callback;
- public LSMInvertedIndexFlushOperation(ILSMIndexAccessorInternal accessor, FileReference dictBTreeFlushTarget,
+ public LSMInvertedIndexFlushOperation(ILSMIndexAccessorInternal accessor,
+ LSMInvertedIndexMutableComponent flushingComponent, FileReference dictBTreeFlushTarget,
FileReference deletedKeysBTreeFlushTarget, ILSMIOOperationCallback callback) {
this.accessor = accessor;
+ this.flushingComponent = flushingComponent;
this.dictBTreeFlushTarget = dictBTreeFlushTarget;
this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget;
this.callback = callback;
@@ -67,4 +70,8 @@
public FileReference getDeletedKeysBTreeFlushTarget() {
return deletedKeysBTreeFlushTarget;
}
+
+ public LSMInvertedIndexMutableComponent getFlushingComponent() {
+ return flushingComponent;
+ }
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexImmutableComponent.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexImmutableComponent.java
new file mode 100644
index 0000000..5099a7b
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexImmutableComponent.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractImmutableLSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+
+public class LSMInvertedIndexImmutableComponent extends AbstractImmutableLSMComponent {
+
+ private final IInvertedIndex invIndex;
+ private final BTree deletedKeysBTree;
+
+ public LSMInvertedIndexImmutableComponent(IInvertedIndex invIndex, BTree deletedKeysBTree) {
+ this.invIndex = invIndex;
+ this.deletedKeysBTree = deletedKeysBTree;
+ }
+
+ @Override
+ public void destroy() throws HyracksDataException {
+ invIndex.deactivate();
+ invIndex.destroy();
+ deletedKeysBTree.deactivate();
+ deletedKeysBTree.destroy();
+ }
+
+ public IInvertedIndex getInvIndex() {
+ return invIndex;
+ }
+
+ public BTree getDeletedKeysBTree() {
+ return deletedKeysBTree;
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index 9f7ea8d..63b604e 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -53,7 +53,7 @@
public Set<IODeviceHandle> getReadDevices() {
Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
for (Object o : mergingComponents) {
- LSMInvertedIndexComponent component = (LSMInvertedIndexComponent) o;
+ LSMInvertedIndexImmutableComponent component = (LSMInvertedIndexImmutableComponent) o;
OnDiskInvertedIndex invIndex = (OnDiskInvertedIndex) component.getInvIndex();
devs.add(invIndex.getBTree().getFileReference().getDeviceHandle());
devs.add(component.getDeletedKeysBTree().getFileReference().getDeviceHandle());
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexComponent.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMutableComponent.java
similarity index 67%
rename from hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexComponent.java
rename to hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMutableComponent.java
index d9a65b9..c36319d 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexComponent.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMutableComponent.java
@@ -17,33 +17,21 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMComponent;
+import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractMutableLSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
-public class LSMInvertedIndexComponent extends AbstractLSMComponent {
+public class LSMInvertedIndexMutableComponent extends AbstractMutableLSMComponent {
private final IInvertedIndex invIndex;
private final BTree deletedKeysBTree;
+ private final IInMemoryFreePageManager mfpm;
- public LSMInvertedIndexComponent(IInvertedIndex invIndex, BTree deletedKeysBTree) {
+ public LSMInvertedIndexMutableComponent(IInvertedIndex invIndex, BTree deletedKeysBTree,
+ IInMemoryFreePageManager mfpm) {
this.invIndex = invIndex;
this.deletedKeysBTree = deletedKeysBTree;
- }
-
- @Override
- public void destroy() throws HyracksDataException {
- invIndex.deactivate();
- invIndex.destroy();
- deletedKeysBTree.deactivate();
- deletedKeysBTree.destroy();
- }
-
- @Override
- public void reset() throws HyracksDataException {
- ((InMemoryFreePageManager) deletedKeysBTree.getFreePageManager()).reset();
- invIndex.clear();
- deletedKeysBTree.clear();
+ this.mfpm = mfpm;
}
public IInvertedIndex getInvIndex() {
@@ -53,4 +41,15 @@
public BTree getDeletedKeysBTree() {
return deletedKeysBTree;
}
+
+ @Override
+ protected boolean isFull() {
+ return mfpm.isFull();
+ }
+
+ @Override
+ protected void reset() throws HyracksDataException {
+ invIndex.clear();
+ deletedKeysBTree.clear();
+ }
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index 625ecd4..b961b7a 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -15,6 +15,9 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+import java.util.LinkedList;
+import java.util.List;
+
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -22,6 +25,7 @@
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
@@ -29,17 +33,18 @@
public class LSMInvertedIndexOpContext implements ILSMIndexOperationContext {
private static final int NUM_DOCUMENT_FIELDS = 1;
-
+
private IndexOperation op;
private final IInvertedIndex memInvIndex;
private final IIndex memDeletedKeysBTree;
+ private final List<ILSMComponent> componentHolder;
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
-
+
// Tuple that only has the inverted-index elements (aka keys), projecting away the document fields.
public PermutingTupleReference keysOnlyTuple;
-
+
// Accessor to the in-memory inverted index.
public IInvertedIndexAccessor memInvIndexAccessor;
// Accessor to the deleted-keys BTree.
@@ -49,17 +54,20 @@
IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback) {
this.memInvIndex = memInvIndex;
this.memDeletedKeysBTree = memDeletedKeysBTree;
+ this.componentHolder = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
}
@Override
public void reset() {
+ componentHolder.clear();
}
@Override
// TODO: Ignore opcallback for now.
public void setOperation(IndexOperation newOp) {
+ reset();
switch (newOp) {
case INSERT:
case DELETE:
@@ -82,13 +90,18 @@
}
op = newOp;
}
-
+
@Override
public IndexOperation getOperation() {
return op;
}
@Override
+ public List<ILSMComponent> getComponentHolder() {
+ return componentHolder;
+ }
+
+ @Override
public ISearchOperationCallback getSearchOperationCallback() {
return searchCallback;
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
index b6d3bb7..1ea66f7 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
@@ -153,7 +153,7 @@
public void close() throws HyracksDataException {
reset();
accessorIndex = -1;
- harness.closeSearchCursor(operationalComponents, includeMemComponent, opCtx);
+ harness.endSearch(opCtx);
}
@Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 440a5a7..046520d 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -16,7 +16,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
import java.io.File;
-import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -43,13 +42,13 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentState;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
@@ -63,7 +62,7 @@
protected final IBinaryComparatorFactory[] linearizerArray;
// In-memory components.
- protected final LSMRTreeComponent mutableComponent;
+ protected final LSMRTreeMutableComponent mutableComponent;
protected final IInMemoryBufferCache memBufferCache;
// This is used to estimate number of tuples in the memory RTree and BTree
@@ -102,7 +101,7 @@
BTree memBTree = new BTree(memBufferCache, ((InMemoryBufferCache) memBufferCache).getFileMapProvider(),
memFreePageManager, btreeInteriorFrameFactory, btreeLeafFrameFactory, btreeCmpFactories, fieldCount,
new FileReference(new File("membtree")));
- mutableComponent = new LSMRTreeComponent(memRTree, memBTree);
+ mutableComponent = new LSMRTreeMutableComponent(memRTree, memBTree, memFreePageManager);
this.memBufferCache = memBufferCache;
this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory;
this.rtreeLeafFrameFactory = rtreeLeafFrameFactory;
@@ -127,7 +126,7 @@
fileManager.deleteDirs();
fileManager.createDirs();
- immutableComponents.clear();
+ componentsRef.get().clear();
}
@Override
@@ -187,33 +186,33 @@
}
@Override
- public List<ILSMComponent> getOperationalComponents(IIndexOperationContext ctx) {
- List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
+ public void getOperationalComponents(ILSMIndexOperationContext ctx) {
+ List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
+ operationalComponents.clear();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
switch (ctx.getOperation()) {
+ case INSERT:
+ case DELETE:
+ case FLUSH:
+ operationalComponents.add(mutableComponent);
+ break;
case SEARCH:
- // TODO: We should add the mutable component at some point.
+ operationalComponents.add(mutableComponent);
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- // TODO: determining the participating components in a merge should probably the task of the merge policy.
- if (immutableComponents.size() > 1) {
- for (ILSMComponent c : immutableComponents) {
- if (c.negativeCompareAndSet(LSMComponentState.MERGING, LSMComponentState.MERGING)) {
- operationalComponents.add(c);
- }
- }
- }
+ operationalComponents.addAll(immutableComponents);
break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
- return operationalComponents;
}
protected LSMComponentFileReferences getMergeTargetFileName(List<ILSMComponent> mergingDiskComponents)
throws HyracksDataException {
- RTree firstTree = ((LSMRTreeComponent) mergingDiskComponents.get(0)).getRTree();
- RTree lastTree = ((LSMRTreeComponent) mergingDiskComponents.get(mergingDiskComponents.size() - 1)).getRTree();
+ RTree firstTree = ((LSMRTreeImmutableComponent) mergingDiskComponents.get(0)).getRTree();
+ RTree lastTree = ((LSMRTreeImmutableComponent) mergingDiskComponents.get(mergingDiskComponents.size() - 1))
+ .getRTree();
FileReference firstFile = diskFileMapProvider.lookupFileName(firstTree.getFileId());
FileReference lastFile = diskFileMapProvider.lookupFileName(lastTree.getFileId());
LSMComponentFileReferences fileRefs = fileManager.getRelMergeFileReference(firstFile.getFile().getName(),
@@ -221,10 +220,10 @@
return fileRefs;
}
- protected LSMRTreeComponent createDiskComponent(ILSMComponentFactory factory, FileReference insertFileRef,
+ protected LSMRTreeImmutableComponent createDiskComponent(ILSMComponentFactory factory, FileReference insertFileRef,
FileReference deleteFileRef, boolean createComponent) throws HyracksDataException, IndexException {
// Create new tree instance.
- LSMRTreeComponent component = (LSMRTreeComponent) factory
+ LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) factory
.createLSMComponentInstance(new LSMComponentFileReferences(insertFileRef, deleteFileRef));
if (createComponent) {
component.getRTree().create();
@@ -271,8 +270,7 @@
}
@Override
- public void insertUpdateOrDelete(ITupleReference tuple, IIndexOperationContext ictx) throws HyracksDataException,
- IndexException {
+ public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException, IndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
if (ctx.getOperation() == IndexOperation.PHYSICALDELETE) {
throw new UnsupportedOperationException("Physical delete not yet supported in LSM R-tree");
@@ -323,15 +321,6 @@
}
}
- @Override
- public void resetMutableComponent() throws HyracksDataException {
- memFreePageManager.reset();
- mutableComponent.getRTree().clear();
- mutableComponent.getBTree().clear();
- memRTreeTuples = 0;
- memBTreeTuples = 0;
- }
-
protected LSMRTreeOpContext createOpContext() {
return new LSMRTreeOpContext((RTree.RTreeAccessor) mutableComponent.getRTree().createAccessor(
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE),
@@ -349,7 +338,7 @@
}
public boolean isEmptyIndex() throws HyracksDataException {
- return immutableComponents.isEmpty()
+ return componentsRef.get().isEmpty()
&& mutableComponent.getBTree().isEmptyTree(
mutableComponent.getBTree().getInteriorFrameFactory().createFrame())
&& mutableComponent.getRTree().isEmptyTree(
@@ -366,9 +355,4 @@
InMemoryBufferCache memBufferCache = (InMemoryBufferCache) mutableComponent.getRTree().getBufferCache();
return memBufferCache.getNumPages() * memBufferCache.getPageSize();
}
-
- @Override
- public ILSMComponent getMutableComponent() {
- return mutableComponent;
- }
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 3e5869c..05efb97 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -26,10 +26,8 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
@@ -89,6 +87,7 @@
@Override
public synchronized void activate() throws HyracksDataException {
super.activate();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
List<LSMComponentFileReferences> validFileReferences;
try {
validFileReferences = fileManager.cleanupAndGetValidFiles();
@@ -97,7 +96,7 @@
}
immutableComponents.clear();
for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
- LSMRTreeComponent component;
+ LSMRTreeImmutableComponent component;
try {
component = createDiskComponent(componentFactory,
lsmComonentFileReference.getInsertIndexFileReference(),
@@ -113,8 +112,9 @@
@Override
public synchronized void deactivate() throws HyracksDataException {
super.deactivate();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- LSMRTreeComponent component = (LSMRTreeComponent) c;
+ LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) c;
RTree rtree = component.getRTree();
BTree btree = component.getBTree();
rtree.deactivate();
@@ -126,8 +126,9 @@
@Override
public synchronized void destroy() throws HyracksDataException {
super.destroy();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- LSMRTreeComponent component = (LSMRTreeComponent) c;
+ LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) c;
component.getBTree().destroy();
component.getRTree().destroy();
}
@@ -137,8 +138,9 @@
@Override
public synchronized void clear() throws HyracksDataException {
super.clear();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- LSMRTreeComponent component = (LSMRTreeComponent) c;
+ LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) c;
component.getBTree().deactivate();
component.getRTree().deactivate();
component.getBTree().destroy();
@@ -148,12 +150,14 @@
}
@Override
- public void search(IIndexCursor cursor, List<ILSMComponent> immutableComponents, ISearchPredicate pred,
- IIndexOperationContext ictx, boolean includeMutableComponent) throws HyracksDataException, IndexException {
+ public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
- int numDiskComponents = immutableComponents.size();
- int numTrees = (includeMutableComponent) ? numDiskComponents + 1 : numDiskComponents;
+ List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
+ boolean includeMutableComponent = operationalComponents.get(0) == mutableComponent;
+ int numTrees = operationalComponents.size();
+ ListIterator<ILSMComponent> diskComponentIter = operationalComponents.listIterator();
ITreeIndexAccessor[] rTreeAccessors = new ITreeIndexAccessor[numTrees];
ITreeIndexAccessor[] bTreeAccessors = new ITreeIndexAccessor[numTrees];
int diskComponentIx = 0;
@@ -161,11 +165,11 @@
rTreeAccessors[0] = ctx.memRTreeAccessor;
bTreeAccessors[0] = ctx.memBTreeAccessor;
diskComponentIx++;
+ diskComponentIter.next();
}
- ListIterator<ILSMComponent> diskComponentIter = immutableComponents.listIterator();
while (diskComponentIter.hasNext()) {
- LSMRTreeComponent component = (LSMRTreeComponent) diskComponentIter.next();
+ LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) diskComponentIter.next();
RTree diskRTree = component.getRTree();
BTree diskBTree = component.getBTree();
rTreeAccessors[diskComponentIx] = diskRTree.createAccessor(NoOpOperationCallback.INSTANCE,
@@ -175,11 +179,11 @@
diskComponentIx++;
}
- List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
- if (includeMutableComponent) {
- operationalComponents.add(getMutableComponent());
- }
- operationalComponents.addAll(immutableComponents);
+ List<ILSMComponent> searchComponents = new ArrayList<ILSMComponent>();
+ // if (includeMutableComponent) {
+ // searchComponents.add(mutableComponent);
+ // }
+ // searchComponents.addAll(operationalComponents);
LSMRTreeCursorInitialState initialState = new LSMRTreeCursorInitialState(numTrees, rtreeLeafFrameFactory,
rtreeInteriorFrameFactory, btreeLeafFrameFactory, ctx.getBTreeMultiComparator(), rTreeAccessors,
bTreeAccessors, includeMutableComponent, lsmHarness, comparatorFields, linearizerArray,
@@ -188,19 +192,33 @@
}
@Override
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
+ LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
+ ILSMIndexOperationContext rctx = createOpContext();
+ LSMRTreeMutableComponent flushingComponent = (LSMRTreeMutableComponent) ctx.getComponentHolder().get(0);
+ rctx.setOperation(IndexOperation.FLUSH);
+ rctx.getComponentHolder().addAll(ctx.getComponentHolder());
+ LSMRTreeAccessor accessor = new LSMRTreeAccessor(lsmHarness, rctx);
+ ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, componentFileRefs
+ .getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), callback));
+ }
+
+ @Override
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation;
+ LSMRTreeMutableComponent flushingComponent = flushOp.getFlushingComponent();
// Renaming order is critical because we use assume ordering when we
// read the file names when we open the tree.
// The RTree should be renamed before the BTree.
// scan the memory RTree
- ITreeIndexAccessor memRTreeAccessor = mutableComponent.getRTree().createAccessor(
+ ITreeIndexAccessor memRTreeAccessor = flushingComponent.getRTree().createAccessor(
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor();
SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
- LSMRTreeComponent component = createDiskComponent(componentFactory, flushOp.getRTreeFlushTarget(),
+ LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getRTreeFlushTarget(),
flushOp.getBTreeFlushTarget(), true);
RTree diskRTree = component.getRTree();
IIndexBulkLoader rTreeBulkloader;
@@ -209,9 +227,9 @@
IBinaryComparatorFactory[] linearizerArray = { linearizer };
if (rTreeTupleSorter == null) {
- rTreeTupleSorter = new TreeTupleSorter(memRTreeTuples, mutableComponent.getRTree().getFileId(),
+ rTreeTupleSorter = new TreeTupleSorter(memRTreeTuples, flushingComponent.getRTree().getFileId(),
linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
- mutableComponent.getRTree().getBufferCache(), comparatorFields);
+ flushingComponent.getRTree().getBufferCache(), comparatorFields);
} else {
rTreeTupleSorter.reset();
}
@@ -248,7 +266,7 @@
rTreeBulkloader.end();
// scan the memory BTree
- ITreeIndexAccessor memBTreeAccessor = mutableComponent.getBTree().createAccessor(
+ ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree().createAccessor(
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
IIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor();
RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
@@ -267,7 +285,30 @@
btreeScanCursor.close();
}
bTreeBulkloader.end();
- return new LSMRTreeComponent(diskRTree, diskBTree);
+ memRTreeTuples = 0;
+ memBTreeTuples = 0;
+ return new LSMRTreeImmutableComponent(diskRTree, diskBTree);
+ }
+
+ @Override
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ // Renaming order is critical because we use assume ordering when we
+ // read the file names when we open the tree.
+ // The RTree should be renamed before the BTree.
+ List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
+ ILSMIndexOperationContext rctx = createOpContext();
+ rctx.getComponentHolder().addAll(mergingComponents);
+ ITreeIndexCursor cursor = new LSMRTreeSortedCursor(rctx, linearizer);
+ ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+ search(rctx, cursor, rtreeSearchPred);
+
+ rctx.setOperation(IndexOperation.MERGE);
+ LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
+ ILSMIndexAccessorInternal accessor = new LSMRTreeAccessor(lsmHarness, rctx);
+ ioScheduler.scheduleOperation(new LSMRTreeMergeOperation((ILSMIndexAccessorInternal) accessor,
+ mergingComponents, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs
+ .getDeleteIndexFileReference(), callback));
}
@Override
@@ -284,7 +325,7 @@
}
// Bulk load the tuples from all on-disk RTrees into the new RTree.
- LSMRTreeComponent component = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
+ LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
mergeOp.getBTreeMergeTarget(), true);
RTree mergedRTree = component.getRTree();
BTree mergedBTree = component.getBTree();
@@ -304,39 +345,11 @@
// Load an empty BTree tree.
mergedBTree.createBulkLoader(1.0f, false).end();
- return new LSMRTreeComponent(mergedRTree, mergedBTree);
+ return new LSMRTreeImmutableComponent(mergedRTree, mergedBTree);
}
@Override
- public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
- // Renaming order is critical because we use assume ordering when we
- // read the file names when we open the tree.
- // The RTree should be renamed before the BTree.
- ILSMIndexOperationContext ctx = createOpContext();
- ctx.setOperation(IndexOperation.MERGE);
- ITreeIndexCursor cursor;
- cursor = new LSMRTreeSortedCursor(ctx, linearizer);
- ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
- // Scan the RTrees, ignoring the in-memory RTree.
- List<ILSMComponent> mergingComponents;
- try {
- mergingComponents = lsmHarness.search(cursor, rtreeSearchPred, ctx, false);
- } catch (IndexException e) {
- throw new HyracksDataException(e);
- }
- // Nothing to merge.
- if (mergingComponents.size() <= 1) {
- cursor.close();
- return null;
- }
- LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
- return new LSMRTreeMergeOperation((ILSMIndexAccessorInternal) createAccessor(null, null), mergingComponents,
- cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
- callback);
- }
-
- @Override
- public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+ public ILSMIndexAccessorInternal createAccessor(IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback) {
return new LSMRTreeAccessor(lsmHarness, createOpContext());
}
@@ -355,15 +368,6 @@
LSMRTreeOpContext concreteCtx = (LSMRTreeOpContext) ctx;
return concreteCtx.rtreeOpContext.cmp;
}
-
- @Override
- public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException {
- LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
- lsmHarness.getIOScheduler().scheduleOperation(
- new LSMRTreeFlushOperation((ILSMIndexAccessorInternal) createAccessor(null, null),
- componentFileRefs.getInsertIndexFileReference(), componentFileRefs
- .getDeleteIndexFileReference(), callback));
- }
}
private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
@@ -391,7 +395,7 @@
} catch (IndexException e) {
throw new TreeIndexException(e);
}
- bulkLoader = ((LSMRTreeComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput);
+ bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput);
}
@Override
@@ -417,16 +421,16 @@
}
protected void handleException() throws HyracksDataException {
- ((LSMRTreeComponent) component).getRTree().deactivate();
- ((LSMRTreeComponent) component).getRTree().destroy();
- ((LSMRTreeComponent) component).getBTree().deactivate();
- ((LSMRTreeComponent) component).getBTree().destroy();
+ ((LSMRTreeImmutableComponent) component).getRTree().deactivate();
+ ((LSMRTreeImmutableComponent) component).getRTree().destroy();
+ ((LSMRTreeImmutableComponent) component).getBTree().deactivate();
+ ((LSMRTreeImmutableComponent) component).getBTree().destroy();
}
}
@Override
public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
- LSMRTreeComponent component = (LSMRTreeComponent) lsmComponent;
+ LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) lsmComponent;
forceFlushDirtyPages(component.getRTree());
markAsValidInternal(component.getRTree());
forceFlushDirtyPages(component.getBTree());
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
index e5af9ee..41ffd3d 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
@@ -105,7 +105,7 @@
rtreeCursors = null;
btreeCursors = null;
} finally {
- lsmHarness.closeSearchCursor(operationalComponents, includeMemRTree, opCtx);
+ lsmHarness.endSearch(opCtx);
}
open = false;
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponentFactory.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponentFactory.java
index b875c67..1681fbd 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponentFactory.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponentFactory.java
@@ -35,7 +35,7 @@
@Override
public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException {
- return new LSMRTreeComponent(rtreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()),
+ return new LSMRTreeImmutableComponent(rtreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()),
btreeFactory.createIndexInstance(cfr.getDeleteIndexFileReference()));
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index f0ea892..8698a1d 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -15,13 +15,15 @@
public class LSMRTreeFlushOperation implements ILSMIOOperation {
private final ILSMIndexAccessorInternal accessor;
+ private final LSMRTreeMutableComponent flushingComponent;
private final FileReference rtreeFlushTarget;
private final FileReference btreeFlushTarget;
private final ILSMIOOperationCallback callback;
- public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, FileReference rtreeFlushTarget,
- FileReference btreeFlushTarget, ILSMIOOperationCallback callback) {
+ public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, LSMRTreeMutableComponent flushingComponent,
+ FileReference rtreeFlushTarget, FileReference btreeFlushTarget, ILSMIOOperationCallback callback) {
this.accessor = accessor;
+ this.flushingComponent = flushingComponent;
this.rtreeFlushTarget = rtreeFlushTarget;
this.btreeFlushTarget = btreeFlushTarget;
this.callback = callback;
@@ -57,4 +59,8 @@
public FileReference getBTreeFlushTarget() {
return btreeFlushTarget;
}
+
+ public LSMRTreeMutableComponent getFlushingComponent() {
+ return flushingComponent;
+ }
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeImmutableComponent.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeImmutableComponent.java
new file mode 100644
index 0000000..afba3a0
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeImmutableComponent.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractImmutableLSMComponent;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+
+public class LSMRTreeImmutableComponent extends AbstractImmutableLSMComponent {
+ private final RTree rtree;
+ private final BTree btree;
+
+ public LSMRTreeImmutableComponent(RTree rtree, BTree btree) {
+ this.rtree = rtree;
+ this.btree = btree;
+ }
+
+ @Override
+ public void destroy() throws HyracksDataException {
+ rtree.deactivate();
+ rtree.destroy();
+ if (btree != null) {
+ btree.deactivate();
+ btree.destroy();
+ }
+ }
+
+ public RTree getRTree() {
+ return rtree;
+ }
+
+ public BTree getBTree() {
+ return btree;
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index 368fc97..970b253 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -38,7 +38,7 @@
public Set<IODeviceHandle> getReadDevices() {
Set<IODeviceHandle> devs = new HashSet<IODeviceHandle>();
for (ILSMComponent o : mergingComponents) {
- LSMRTreeComponent component = (LSMRTreeComponent) o;
+ LSMRTreeImmutableComponent component = (LSMRTreeImmutableComponent) o;
devs.add(component.getRTree().getFileReference().getDeviceHandle());
devs.add(component.getBTree().getFileReference().getDeviceHandle());
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponent.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMutableComponent.java
similarity index 67%
rename from hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponent.java
rename to hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMutableComponent.java
index 4b53055..80f76a1 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponent.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMutableComponent.java
@@ -17,37 +17,20 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMComponent;
+import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractMutableLSMComponent;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
-public class LSMRTreeComponent extends AbstractLSMComponent {
+public class LSMRTreeMutableComponent extends AbstractMutableLSMComponent {
private final RTree rtree;
private final BTree btree;
+ private final IInMemoryFreePageManager mfpm;
- public LSMRTreeComponent(RTree rtree, BTree btree) {
+ public LSMRTreeMutableComponent(RTree rtree, BTree btree, IInMemoryFreePageManager mfpm) {
this.rtree = rtree;
this.btree = btree;
- }
-
- @Override
- public void destroy() throws HyracksDataException {
- rtree.deactivate();
- rtree.destroy();
- if (btree != null) {
- btree.deactivate();
- btree.destroy();
- }
- }
-
- @Override
- public void reset() throws HyracksDataException {
- ((InMemoryFreePageManager) rtree.getFreePageManager()).reset();
- rtree.clear();
- if (btree != null) {
- btree.clear();
- }
+ this.mfpm = mfpm;
}
public RTree getRTree() {
@@ -57,4 +40,17 @@
public BTree getBTree() {
return btree;
}
+
+ @Override
+ protected boolean isFull() {
+ return mfpm.isFull();
+ }
+
+ @Override
+ protected void reset() throws HyracksDataException {
+ rtree.clear();
+ if (btree != null) {
+ btree.clear();
+ }
+ }
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index 7dc3107..b8805d1 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -15,6 +15,9 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+import java.util.LinkedList;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext;
@@ -25,6 +28,7 @@
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
@@ -38,6 +42,7 @@
public final RTree.RTreeAccessor memRTreeAccessor;
public final BTree.BTreeAccessor memBTreeAccessor;
private IndexOperation op;
+ public final List<ILSMComponent> componentHolder;
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
@@ -49,6 +54,7 @@
IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback) {
this.memRTreeAccessor = memRtreeAccessor;
this.memBTreeAccessor = memBtreeAccessor;
+ this.componentHolder = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
this.rtreeOpContext = new RTreeOpContext(rtreeLeafFrame, rtreeInteriorFrame, rtreeMetaFrame, rtreeCmpFactories,
@@ -58,6 +64,7 @@
}
public void setOperation(IndexOperation newOp) {
+ reset();
if (newOp == IndexOperation.INSERT) {
rtreeOpContext.setOperation(newOp);
} else if (newOp == IndexOperation.DELETE) {
@@ -68,7 +75,7 @@
@Override
public void reset() {
-
+ componentHolder.clear();
}
@Override
@@ -81,6 +88,11 @@
}
@Override
+ public List<ILSMComponent> getComponentHolder() {
+ return componentHolder;
+ }
+
+ @Override
public ISearchOperationCallback getSearchOperationCallback() {
return searchCallback;
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index 53f467b..f5e6f3d 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -48,7 +48,7 @@
rtreeCursors = null;
btreeCursors = null;
} finally {
- lsmHarness.closeSearchCursor(operationalComponents, includeMemRTree, opCtx);
+ lsmHarness.endSearch(opCtx);
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index 9a4db5f..46cf050 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -57,7 +57,7 @@
}
} finally {
if (open) {
- lsmHarness.closeSearchCursor(operationalComponents, includeMemRTree, opCtx);
+ lsmHarness.endSearch(opCtx);
}
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 703cd21..f1b3931 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
-import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
@@ -26,10 +25,8 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
@@ -92,6 +89,7 @@
@Override
public synchronized void activate() throws HyracksDataException {
super.activate();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
immutableComponents.clear();
List<LSMComponentFileReferences> validFileReferences;
try {
@@ -100,7 +98,7 @@
throw new HyracksDataException(e);
}
for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
- LSMRTreeComponent component;
+ LSMRTreeImmutableComponent component;
try {
component = createDiskComponent(componentFactory,
lsmComonentFileReference.getInsertIndexFileReference(), null, false);
@@ -115,8 +113,9 @@
@Override
public synchronized void deactivate() throws HyracksDataException {
super.deactivate();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- RTree rtree = (RTree) ((LSMRTreeComponent) c).getRTree();
+ RTree rtree = (RTree) ((LSMRTreeImmutableComponent) c).getRTree();
rtree.deactivate();
}
isActivated = false;
@@ -125,8 +124,9 @@
@Override
public synchronized void destroy() throws HyracksDataException {
super.destroy();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- RTree rtree = (RTree) ((LSMRTreeComponent) c).getRTree();
+ RTree rtree = (RTree) ((LSMRTreeImmutableComponent) c).getRTree();
rtree.destroy();
}
fileManager.deleteDirs();
@@ -135,8 +135,9 @@
@Override
public synchronized void clear() throws HyracksDataException {
super.clear();
+ List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
- RTree rtree = (RTree) ((LSMRTreeComponent) c).getRTree();
+ RTree rtree = (RTree) ((LSMRTreeImmutableComponent) c).getRTree();
rtree.deactivate();
rtree.destroy();
}
@@ -144,11 +145,13 @@
}
@Override
- public void search(IIndexCursor cursor, List<ILSMComponent> immutableComponents, ISearchPredicate pred,
- IIndexOperationContext ictx, boolean includeMutableComponent) throws HyracksDataException, IndexException {
+ public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
+ List<ILSMComponent> operationalComponents = ictx.getComponentHolder();
+ boolean includeMutableComponent = operationalComponents.get(0) == mutableComponent;
LSMRTreeWithAntiMatterTuplesSearchCursor lsmTreeCursor = (LSMRTreeWithAntiMatterTuplesSearchCursor) cursor;
- int numDiskRComponents = immutableComponents.size();
+ int numDiskRComponents = operationalComponents.size();
LSMRTreeCursorInitialState initialState;
ITreeIndexAccessor[] bTreeAccessors = null;
@@ -158,11 +161,6 @@
bTreeAccessors[0] = ctx.memBTreeAccessor;
}
- List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
- if (includeMutableComponent) {
- operationalComponents.add(getMutableComponent());
- }
- operationalComponents.addAll(immutableComponents);
initialState = new LSMRTreeCursorInitialState(numDiskRComponents, rtreeLeafFrameFactory,
rtreeInteriorFrameFactory, btreeLeafFrameFactory, ctx.getBTreeMultiComparator(), null, bTreeAccessors,
includeMutableComponent, lsmHarness, comparatorFields, linearizerArray, ctx.searchCallback,
@@ -170,17 +168,19 @@
lsmTreeCursor.open(initialState, pred);
+ ListIterator<ILSMComponent> diskComponentsIter = operationalComponents.listIterator();
+ int diskComponentIx = 0;
if (includeMutableComponent) {
// Open cursor of in-memory RTree
ctx.memRTreeAccessor.search(lsmTreeCursor.getMemRTreeCursor(), pred);
+ diskComponentIx++;
+ diskComponentsIter.next();
}
// Open cursors of on-disk RTrees.
ITreeIndexAccessor[] diskRTreeAccessors = new ITreeIndexAccessor[numDiskRComponents];
- int diskComponentIx = 0;
- ListIterator<ILSMComponent> diskComponentsIter = immutableComponents.listIterator();
while (diskComponentsIter.hasNext()) {
- RTree diskRTree = (RTree) ((LSMRTreeComponent) diskComponentsIter.next()).getRTree();
+ RTree diskRTree = (RTree) ((LSMRTreeImmutableComponent) diskComponentsIter.next()).getRTree();
diskRTreeAccessors[diskComponentIx] = diskRTree.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
diskRTreeAccessors[diskComponentIx].search(lsmTreeCursor.getCursor(diskComponentIx), pred);
@@ -190,23 +190,36 @@
}
@Override
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
+ LSMRTreeOpContext opCtx = createOpContext();
+ LSMComponentFileReferences relFlushFileRefs = fileManager.getRelFlushFileReference();
+ ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
+ opCtx.setOperation(IndexOperation.FLUSH);
+ opCtx.getComponentHolder().add(flushingComponent);
+ ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, opCtx);
+ ioScheduler.scheduleOperation(new LSMFlushOperation(accessor, flushingComponent, relFlushFileRefs
+ .getInsertIndexFileReference(), callback));
+ }
+
+ @Override
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
LSMFlushOperation flushOp = (LSMFlushOperation) operation;
// Renaming order is critical because we use assume ordering when we
// read the file names when we open the tree.
// The RTree should be renamed before the BTree.
-
- // scan the memory RTree
- ITreeIndexAccessor memRTreeAccessor = mutableComponent.getRTree().createAccessor(
+ LSMRTreeMutableComponent flushingComponent = (LSMRTreeMutableComponent) flushOp.getFlushingComponent();
+ ITreeIndexAccessor memRTreeAccessor = flushingComponent.getRTree().createAccessor(
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor();
SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
- LSMRTreeComponent component = createDiskComponent(componentFactory, flushOp.getFlushTarget(), null, true);
+ LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getFlushTarget(), null,
+ true);
RTree diskRTree = component.getRTree();
// scan the memory BTree
- ITreeIndexAccessor memBTreeAccessor = mutableComponent.getBTree().createAccessor(
+ ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree().createAccessor(
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
BTreeRangeSearchCursor btreeScanCursor = (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor();
RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
@@ -215,13 +228,13 @@
// Since the LSM-RTree is used as a secondary assumption, the
// primary key will be the last comparator in the BTree comparators
if (rTreeTupleSorter == null) {
- rTreeTupleSorter = new TreeTupleSorter(memRTreeTuples, mutableComponent.getRTree().getFileId(),
+ rTreeTupleSorter = new TreeTupleSorter(memRTreeTuples, flushingComponent.getRTree().getFileId(),
linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(),
- mutableComponent.getRTree().getBufferCache(), comparatorFields);
+ flushingComponent.getRTree().getBufferCache(), comparatorFields);
- bTreeTupleSorter = new TreeTupleSorter(memBTreeTuples, mutableComponent.getBTree().getFileId(),
+ bTreeTupleSorter = new TreeTupleSorter(memBTreeTuples, flushingComponent.getBTree().getFileId(),
linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(),
- mutableComponent.getBTree().getBufferCache(), comparatorFields);
+ flushingComponent.getBTree().getBufferCache(), comparatorFields);
} else {
rTreeTupleSorter.reset();
bTreeTupleSorter.reset();
@@ -279,10 +292,28 @@
rTreeBulkloader.end();
+ memRTreeTuples = 0;
+ memBTreeTuples = 0;
return component;
}
@Override
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
+ LSMRTreeOpContext rctx = createOpContext();
+ rctx.getComponentHolder().addAll(mergingComponents);
+ ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(ctx);
+ ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+ search(rctx, cursor, (SearchPredicate) rtreeSearchPred);
+ rctx.setOperation(IndexOperation.MERGE);
+ LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
+ ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, rctx);
+ ioScheduler.scheduleOperation(new LSMRTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
+ .getInsertIndexFileReference(), null, callback));
+ }
+
+ @Override
public ILSMComponent merge(List<ILSMComponent> mergedComponents, ILSMIOOperation operation)
throws HyracksDataException, IndexException {
LSMRTreeMergeOperation mergeOp = (LSMRTreeMergeOperation) operation;
@@ -296,7 +327,8 @@
}
// Bulk load the tuples from all on-disk RTrees into the new RTree.
- LSMRTreeComponent component = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(), null, true);
+ LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
+ null, true);
RTree mergedRTree = component.getRTree();
IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false);
try {
@@ -313,7 +345,7 @@
}
@Override
- public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+ public ILSMIndexAccessorInternal createAccessor(IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback) {
return new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, createOpContext());
}
@@ -332,14 +364,6 @@
LSMRTreeOpContext concreteCtx = (LSMRTreeOpContext) ctx;
return concreteCtx.rtreeOpContext.cmp;
}
-
- @Override
- public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException {
- LSMComponentFileReferences relFlushFileRefs = fileManager.getRelFlushFileReference();
- lsmHarness.getIOScheduler().scheduleOperation(
- new LSMFlushOperation((ILSMIndexAccessorInternal) createAccessor(null, null), relFlushFileRefs.getInsertIndexFileReference(),
- callback));
- }
}
@Override
@@ -366,7 +390,7 @@
} catch (IndexException e) {
throw new TreeIndexException(e);
}
- bulkLoader = ((LSMRTreeComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput);
+ bulkLoader = ((LSMRTreeImmutableComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput);
}
@Override
@@ -392,40 +416,15 @@
}
protected void handleException() throws HyracksDataException {
- ((LSMRTreeComponent) component).getRTree().deactivate();
- ((LSMRTreeComponent) component).getRTree().destroy();
+ ((LSMRTreeImmutableComponent) component).getRTree().deactivate();
+ ((LSMRTreeImmutableComponent) component).getRTree().destroy();
}
}
@Override
- public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
- LSMRTreeOpContext ctx = createOpContext();
- ctx.setOperation(IndexOperation.MERGE);
- ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(ctx);
- ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
- // Ordered scan, ignoring the in-memory RTree.
- // We get back a snapshot of the on-disk RTrees that are going to be
- // merged now, so we can clean them up after the merge has completed.
- List<ILSMComponent> mergingComponents;
- try {
- mergingComponents = lsmHarness.search(cursor, (SearchPredicate) rtreeSearchPred, ctx, false);
- if (mergingComponents.size() <= 1) {
- cursor.close();
- return null;
- }
- } catch (IndexException e) {
- throw new HyracksDataException(e);
- }
-
- LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
- return new LSMRTreeMergeOperation((ILSMIndexAccessorInternal) createAccessor(null, null), mergingComponents,
- cursor, relMergeFileRefs.getInsertIndexFileReference(), null, callback);
- }
-
- @Override
public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
- RTree rtree = ((LSMRTreeComponent) lsmComponent).getRTree();
+ RTree rtree = ((LSMRTreeImmutableComponent) lsmComponent).getRTree();
forceFlushDirtyPages(rtree);
markAsValidInternal(rtree);
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesComponentFactory.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesComponentFactory.java
index d9573a4..0ca353b 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesComponentFactory.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesComponentFactory.java
@@ -32,7 +32,7 @@
@Override
public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException {
- return new LSMRTreeComponent(rtreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()), null);
+ return new LSMRTreeImmutableComponent(rtreeFactory.createIndexInstance(cfr.getInsertIndexFileReference()), null);
}
@Override
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
index fb2fb38..3998108 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
@@ -78,8 +78,8 @@
// Test parameters.
public static final int LSM_INVINDEX_NUM_DOCS_TO_INSERT = 100;
// Used for full-fledged search test.
- public static final int LSM_INVINDEX_NUM_DOC_QUERIES = 100;
- public static final int LSM_INVINDEX_NUM_RANDOM_QUERIES = 100;
+ public static final int LSM_INVINDEX_NUM_DOC_QUERIES = 1000;
+ public static final int LSM_INVINDEX_NUM_RANDOM_QUERIES = 1000;
// Used for non-search tests to sanity check index searches.
public static final int LSM_INVINDEX_TINY_NUM_DOC_QUERIES = 200;
public static final int LSM_INVINDEX_TINY_NUM_RANDOM_QUERIES = 200;
@@ -90,7 +90,7 @@
// Allocate a generous size to make sure we have enough elements for all tests.
public static final int LSM_INVINDEX_SCAN_COUNT_ARRAY_SIZE = 1000000;
public static final int LSM_INVINDEX_MULTITHREAD_NUM_OPERATIONS = 200;
-
+
}
/* ORIGINAL TEST PARAMETERS: DO NOT EDIT!
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index ef67cc3..c008f90 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -36,7 +36,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
public class LSMBTreeTestWorker extends AbstractIndexTestWorker {
-
private final LSMBTree lsmBTree;
private final int numKeyFields;
private final ArrayTupleBuilder deleteTb;
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 339d7dc..1874539 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -41,7 +41,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -94,7 +94,7 @@
new LIFOMetaDataFrameFactory());
this.ioScheduler = SynchronousScheduler.INSTANCE;
lsmtree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, file, bufferCache, fmp,
- typeTraits, cmpFactories, NoMergePolicy.INSTANCE, RefCountingOperationTrackerFactory.INSTANCE,
+ typeTraits, cmpFactories, NoMergePolicy.INSTANCE, ThreadCountingOperationTrackerFactory.INSTANCE,
ioScheduler);
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
index 1523846..f7fb071 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
@@ -40,7 +40,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -87,7 +87,7 @@
this.hyracksFrameSize = AccessMethodTestsConfig.LSM_BTREE_HYRACKS_FRAME_SIZE;
this.ioScheduler = SynchronousScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
- this.opTrackerFactory = RefCountingOperationTrackerFactory.INSTANCE;
+ this.opTrackerFactory = ThreadCountingOperationTrackerFactory.INSTANCE;
}
public LSMBTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -100,7 +100,7 @@
this.hyracksFrameSize = hyracksFrameSize;
this.ioScheduler = SynchronousScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
- this.opTrackerFactory = RefCountingOperationTrackerFactory.INSTANCE;
+ this.opTrackerFactory = ThreadCountingOperationTrackerFactory.INSTANCE;
}
public void setUp() throws HyracksException {
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
index 2a1b69c..e96b637 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/common/LSMInvertedIndexTestHarness.java
@@ -38,7 +38,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -83,7 +83,7 @@
this.hyracksFrameSize = AccessMethodTestsConfig.LSM_INVINDEX_HYRACKS_FRAME_SIZE;
this.ioScheduler = SynchronousScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
- this.opTrackerFactory = RefCountingOperationTrackerFactory.INSTANCE;
+ this.opTrackerFactory = ThreadCountingOperationTrackerFactory.INSTANCE;
}
public LSMInvertedIndexTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -96,7 +96,7 @@
this.hyracksFrameSize = hyracksFrameSize;
this.ioScheduler = SynchronousScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
- this.opTrackerFactory = RefCountingOperationTrackerFactory.INSTANCE;
+ this.opTrackerFactory = ThreadCountingOperationTrackerFactory.INSTANCE;
}
public void setUp() throws HyracksException {
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
index 0a443b3..08205b6 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
@@ -39,7 +39,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -83,7 +83,7 @@
this.hyracksFrameSize = AccessMethodTestsConfig.LSM_RTREE_HYRACKS_FRAME_SIZE;
this.ioScheduler = SynchronousScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
- this.opTrackerFactory = RefCountingOperationTrackerFactory.INSTANCE;
+ this.opTrackerFactory = ThreadCountingOperationTrackerFactory.INSTANCE;
}
public LSMRTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -96,7 +96,7 @@
this.hyracksFrameSize = hyracksFrameSize;
this.ioScheduler = SynchronousScheduler.INSTANCE;
this.mergePolicy = NoMergePolicy.INSTANCE;
- this.opTrackerFactory = RefCountingOperationTrackerFactory.INSTANCE;
+ this.opTrackerFactory = ThreadCountingOperationTrackerFactory.INSTANCE;
}
public void setUp() throws HyracksException {
diff --git a/pom.xml b/pom.xml
index ce15c7f..49a5887 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkMode>pertest</forkMode>
- <argLine>-enableassertions -Djava.util.logging.config.file=${user.home}/logging.properties -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n ${jvm.extraargs}</argLine>
+ <argLine>-enableassertions -Djava.util.logging.config.file=${user.home}/logging.properties -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n ${jvm.extraargs} -Xmx2048m</argLine>
</configuration>
</plugin>
</plugins>