* The cleanup of the merged components in the LSM indexes is now the responsibility of either: the last existing search thread (in case the merge process is over and there are still search threads accessing the merged components), or the merge thread itself (in case the merge process is over and there are no search threads accessing the merged components). * Allowed concurrent merges to occur at the same time instead of the old design which only allowed one merge process at a time per LSM index.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@2573 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
index ce7697e..8990f3f 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -16,5 +16,13 @@
package edu.uci.ics.hyracks.storage.am.common.ophelpers;
public enum IndexOperation {
- INSERT, DELETE, UPDATE, UPSERT, SEARCH, DISKORDERSCAN, PHYSICALDELETE, NOOP
+ INSERT,
+ DELETE,
+ UPDATE,
+ UPSERT,
+ SEARCH,
+ DISKORDERSCAN,
+ PHYSICALDELETE,
+ NOOP,
+ MERGE
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 651083c..ed53290 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -16,9 +16,9 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
import java.io.File;
+import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
-import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -44,6 +44,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
@@ -62,6 +63,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentState;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
@@ -199,6 +201,31 @@
}
@Override
+ public List<ILSMComponent> getOperationalComponents(IIndexOperationContext ctx) {
+ List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
+ switch (ctx.getOperation()) {
+ case SEARCH:
+ case INSERT:
+ // TODO: We should add the mutable component at some point.
+ operationalComponents.addAll(immutableComponents);
+ break;
+ case MERGE:
+ // TODO: determining the participating components in a merge should probably the task of the merge policy.
+ if (immutableComponents.size() > 1) {
+ for (ILSMComponent c : immutableComponents) {
+ if (c.negativeCompareAndSet(LSMComponentState.MERGING, LSMComponentState.MERGING)) {
+ operationalComponents.add(c);
+ }
+ }
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
+ }
+ return operationalComponents;
+ }
+
+ @Override
public void insertUpdateOrDelete(ITupleReference tuple, IIndexOperationContext ictx) throws HyracksDataException,
IndexException {
LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
@@ -240,6 +267,7 @@
memCursor.close();
}
+ // TODO: Can we just remove the above code that search the mutable component and do it together with the search call below? i.e. instead of passing false to the lsmHarness.search(), we pass true to include the mutable component?
// the key was not in the inmemory component, so check the disk components
lsmHarness.search(searchCursor, predicate, ctx, false);
try {
@@ -303,20 +331,25 @@
}
@Override
- public void search(IIndexCursor cursor, List<ILSMComponent> diskComponents, ISearchPredicate pred,
- IIndexOperationContext ictx, boolean includeMemComponent, AtomicInteger searcherRefCount)
- throws HyracksDataException, IndexException {
+ public void search(IIndexCursor cursor, List<ILSMComponent> immutableComponents, ISearchPredicate pred,
+ IIndexOperationContext ictx, boolean includeMutableComponent) throws HyracksDataException, IndexException {
LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
LSMBTreeRangeSearchCursor lsmTreeCursor = (LSMBTreeRangeSearchCursor) cursor;
- int numDiskComponents = diskComponents.size();
- int numBTrees = (includeMemComponent) ? numDiskComponents + 1 : numDiskComponents;
+ int numDiskComponents = immutableComponents.size();
+ int numBTrees = (includeMutableComponent) ? numDiskComponents + 1 : numDiskComponents;
+
+ List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
+ if (includeMutableComponent) {
+ operationalComponents.add(getMutableComponent());
+ }
+ operationalComponents.addAll(immutableComponents);
LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees, insertLeafFrameFactory,
- ctx.cmp, includeMemComponent, searcherRefCount, lsmHarness, ctx.memBTreeAccessor, pred,
- ctx.searchCallback);
+ ctx.cmp, includeMutableComponent, lsmHarness, ctx.memBTreeAccessor, pred, ctx.searchCallback,
+ operationalComponents);
lsmTreeCursor.open(initialState, pred);
int cursorIx;
- if (includeMemComponent) {
+ if (includeMutableComponent) {
// Open cursor of in-memory BTree at index 0.
ctx.memBTreeAccessor.search(lsmTreeCursor.getCursor(0), pred);
// Skip 0 because it is the in-memory BTree.
@@ -328,7 +361,7 @@
// Open cursors of on-disk BTrees.
ITreeIndexAccessor[] diskBTreeAccessors = new ITreeIndexAccessor[numDiskComponents];
int diskBTreeIx = 0;
- ListIterator<ILSMComponent> diskBTreesIter = diskComponents.listIterator();
+ ListIterator<ILSMComponent> diskBTreesIter = immutableComponents.listIterator();
while (diskBTreesIter.hasNext()) {
BTree diskBTree = (BTree) ((LSMBTreeComponent) diskBTreesIter.next()).getBTree();
diskBTreeAccessors[diskBTreeIx] = diskBTree.createAccessor(NoOpOperationCallback.INSTANCE,
@@ -371,15 +404,6 @@
}
@Override
- public void cleanUpAfterMerge(List<ILSMComponent> mergedComponents) throws HyracksDataException {
- for (ILSMComponent c : mergedComponents) {
- BTree oldBTree = (BTree) ((LSMBTreeComponent) c).getBTree();
- oldBTree.deactivate();
- oldBTree.destroy();
- }
- }
-
- @Override
public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput) throws TreeIndexException {
return new LSMBTreeBulkLoader(fillLevel, verifyInput);
}
@@ -500,6 +524,7 @@
public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
LSMBTreeOpContext ctx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ ctx.setOperation(IndexOperation.MERGE);
ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(ctx);
RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
// Ordered scan, ignoring the in-memory BTree.
@@ -559,4 +584,9 @@
InMemoryBufferCache memBufferCache = (InMemoryBufferCache) mutableComponent.getBTree().getBufferCache();
return memBufferCache.getNumPages() * memBufferCache.getPageSize();
}
+
+ @Override
+ public ILSMComponent getMutableComponent() {
+ return mutableComponent;
+ }
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeComponent.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeComponent.java
index b89e332..5264f4f 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeComponent.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeComponent.java
@@ -15,11 +15,12 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentState;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMComponent;
-public class LSMBTreeComponent implements ILSMComponent {
+public class LSMBTreeComponent extends AbstractLSMComponent {
private final BTree btree;
@@ -28,39 +29,15 @@
}
@Override
- public void activate() {
- // TODO Auto-generated method stub
-
+ public void destroy() throws HyracksDataException {
+ btree.deactivate();
+ btree.destroy();
}
@Override
- public void deactivate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void threadEnter() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void threadExit() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void setState(LSMComponentState state) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public LSMComponentState getState() {
- // TODO Auto-generated method stub
- return null;
+ public void reset() throws HyracksDataException {
+ ((InMemoryFreePageManager) btree.getFreePageManager()).reset();
+ btree.clear();
}
public BTree getBTree() {
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
index 37958c7..84f1c64 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeCursorInitialState.java
@@ -15,7 +15,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
@@ -32,25 +33,27 @@
private final ITreeIndexFrameFactory leafFrameFactory;
private MultiComparator cmp;
private final boolean includeMemComponent;
- private final AtomicInteger searcherfRefCount;
private final ILSMHarness lsmHarness;
private final IIndexAccessor memBtreeAccessor;
private final ISearchPredicate predicate;
private ISearchOperationCallback searchCallback;
+ private final List<ILSMComponent> operationalComponents;
+
public LSMBTreeCursorInitialState(int numBTrees, ITreeIndexFrameFactory leafFrameFactory, MultiComparator cmp,
- boolean includeMemComponent, AtomicInteger searcherfRefCount, ILSMHarness lsmHarness,
- IIndexAccessor memBtreeAccessor, ISearchPredicate predicate, ISearchOperationCallback searchCallback) {
+ boolean includeMemComponent, ILSMHarness lsmHarness, IIndexAccessor memBtreeAccessor,
+ ISearchPredicate predicate, ISearchOperationCallback searchCallback,
+ List<ILSMComponent> operationalComponents) {
this.numBTrees = numBTrees;
this.leafFrameFactory = leafFrameFactory;
this.cmp = cmp;
this.includeMemComponent = includeMemComponent;
- this.searcherfRefCount = searcherfRefCount;
this.lsmHarness = lsmHarness;
this.searchCallback = searchCallback;
this.memBtreeAccessor = memBtreeAccessor;
this.predicate = predicate;
+ this.operationalComponents = operationalComponents;
}
public int getNumBTrees() {
@@ -70,10 +73,6 @@
public void setPage(ICachedPage page) {
}
- public AtomicInteger getSearcherRefCount() {
- return searcherfRefCount;
- }
-
public boolean getIncludeMemComponent() {
return includeMemComponent;
}
@@ -92,6 +91,10 @@
this.searchCallback = searchCallback;
}
+ public List<ILSMComponent> getOperationalComponents() {
+ return operationalComponents;
+ }
+
public IIndexAccessor getMemBTreeAccessor() {
return memBtreeAccessor;
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index bf7cb81..a96ac81 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -137,7 +137,7 @@
rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
}
includeMemComponent = lsmInitialState.getIncludeMemComponent();
- searcherRefCount = lsmInitialState.getSearcherRefCount();
+ operationalComponents = lsmInitialState.getOperationalComponents();
lsmHarness = lsmInitialState.getLSMHarness();
searchCallback = lsmInitialState.getSearchOperationCallback();
memBTreeAccessor = lsmInitialState.getMemBTreeAccessor();
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index c1e5eb2..6d3f050 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -1,5 +1,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentState;
public interface ILSMComponent {
@@ -11,7 +12,19 @@
public void threadExit();
+ public int getThreadReferenceCount();
+
public void setState(LSMComponentState state);
+ public boolean negativeCompareAndSet(LSMComponentState compare, LSMComponentState update);
+
public LSMComponentState getState();
+
+ // TODO: create two interfaces one for immutable and another for mutable components.
+
+ // Only for immutable components.
+ public void destroy() throws HyracksDataException;
+
+ // Only for mutable components.
+ public void reset() throws HyracksDataException;
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 047b59b..65b6c9d 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -16,7 +16,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -27,14 +26,15 @@
public interface ILSMHarness {
public boolean insertUpdateOrDelete(ITupleReference tuple, ILSMIndexOperationContext ictx, boolean tryOperation)
throws HyracksDataException, IndexException;
-
- public boolean noOp(ILSMIndexOperationContext ictx, boolean tryOperation) throws HyracksDataException;
+
+ public boolean noOp(ILSMIndexOperationContext ictx, boolean tryOperation) throws HyracksDataException;
public List<ILSMComponent> search(IIndexCursor cursor, ISearchPredicate pred, ILSMIndexOperationContext ctx,
- boolean includeMemComponent) throws HyracksDataException, IndexException;
+ boolean includeMutableComponent) throws HyracksDataException, IndexException;
- public void closeSearchCursor(AtomicInteger searcherRefCount, boolean includeMemComponent, ILSMIndexOperationContext ctx)
- throws HyracksDataException;
+ // Eventually includeMutableComponent and ctx should be removed.
+ public void closeSearchCursor(List<ILSMComponent> operationalComponents, boolean includeMutableComponent,
+ ILSMIndexOperationContext ctx) throws HyracksDataException;
public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
IndexException;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
index 4dc442f..53a5c00 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
@@ -16,7 +16,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -28,31 +27,32 @@
public interface ILSMIndexInternal extends ILSMIndex {
- public ILSMComponent merge(List<ILSMComponent> mergedComponents, ILSMIOOperation operation)
- throws HyracksDataException, IndexException;
-
public void insertUpdateOrDelete(ITupleReference tuple, IIndexOperationContext ictx) throws HyracksDataException,
IndexException;
public void search(IIndexCursor cursor, List<ILSMComponent> diskComponents, ISearchPredicate pred,
- IIndexOperationContext ictx, boolean includeMemComponent, AtomicInteger searcherRefCount)
+ IIndexOperationContext ictx, boolean includeMemComponent) throws HyracksDataException, IndexException;
+
+ public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
+
+ public ILSMComponent merge(List<ILSMComponent> mergedComponents, ILSMIOOperation operation)
throws HyracksDataException, IndexException;
public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
IndexException;
- public void addMergedComponent(ILSMComponent newComponent, List<ILSMComponent> mergedComponents);
+ public void addComponent(ILSMComponent index);
- public void cleanUpAfterMerge(List<ILSMComponent> mergedComponents) throws HyracksDataException;
+ public void subsumeMergedComponents(ILSMComponent newComponent, List<ILSMComponent> mergedComponents);
- public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
-
- public void addFlushedComponent(ILSMComponent index);
+ public List<ILSMComponent> getOperationalComponents(IIndexOperationContext ctx);
public IInMemoryFreePageManager getInMemoryFreePageManager();
public void resetMutableComponent() throws HyracksDataException;
+ public ILSMComponent getMutableComponent();
+
public List<ILSMComponent> getImmutableComponents();
public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
new file mode 100644
index 0000000..e1a3c36
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+
+public abstract class AbstractLSMComponent implements ILSMComponent {
+
+ private final AtomicInteger threadRef = new AtomicInteger();
+ private LSMComponentState state;
+
+ private final Object componentSync = new Object();
+
+ @Override
+ public void activate() {
+
+ }
+
+ @Override
+ public void deactivate() {
+
+ }
+
+ @Override
+ public void threadEnter() {
+ threadRef.incrementAndGet();
+ }
+
+ @Override
+ public void threadExit() {
+ threadRef.decrementAndGet();
+ }
+
+ @Override
+ public int getThreadReferenceCount() {
+ return threadRef.get();
+ }
+
+ @Override
+ public void setState(LSMComponentState state) {
+ synchronized (componentSync) {
+ this.state = state;
+ }
+ }
+
+ @Override
+ public LSMComponentState getState() {
+ synchronized (componentSync) {
+ return state;
+ }
+ }
+
+ @Override
+ public boolean negativeCompareAndSet(LSMComponentState compare, LSMComponentState update) {
+ synchronized (componentSync) {
+ if (state != compare) {
+ state = update;
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 9ba101b..13505d8 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -118,14 +118,15 @@
}
@Override
- public void addFlushedComponent(ILSMComponent index) {
+ public void addComponent(ILSMComponent index) {
immutableComponents.addFirst(index);
}
@Override
- public void addMergedComponent(ILSMComponent newComponent, List<ILSMComponent> mergedComponents) {
+ public void subsumeMergedComponents(ILSMComponent newComponent, List<ILSMComponent> mergedComponents) {
+ int firstComponentIndex = immutableComponents.indexOf(mergedComponents.get(0));
immutableComponents.removeAll(mergedComponents);
- immutableComponents.addLast(newComponent);
+ immutableComponents.add(firstComponentIndex, newComponent);
}
@Override
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateScheduler.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateScheduler.java
index c84a852..03a41dc 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateScheduler.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateScheduler.java
@@ -9,14 +9,11 @@
INSTANCE;
@Override
- public void scheduleOperation(ILSMIOOperation operation) {
+ public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
try {
operation.perform();
- } catch (HyracksDataException e) {
- e.printStackTrace();
} catch (IndexException e) {
- e.printStackTrace();
+ throw new HyracksDataException(e);
}
}
-
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentState.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentState.java
index df4f07d..e554a6e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentState.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentState.java
@@ -16,5 +16,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
public enum LSMComponentState {
-
+ FLUSHING,
+ MERGING,
+ DONE_FLUSHING,
+ DONE_MERGING
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index bf0f90b..7eef5f4 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -17,8 +17,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -51,23 +49,12 @@
*/
public class LSMHarness implements ILSMHarness {
protected final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
- protected static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
private ILSMIndexInternal lsmIndex;
// All accesses to the LSM-Tree's on-disk components are synchronized on diskComponentsSync.
private Object diskComponentsSync = new Object();
- // For synchronizing searchers with a concurrent merge.
- private AtomicBoolean isMerging = new AtomicBoolean(false);
- private AtomicInteger searcherRefCountA = new AtomicInteger(0);
- private AtomicInteger searcherRefCountB = new AtomicInteger(0);
-
- // Represents the current number of searcher threads that are operating on
- // the unmerged on-disk Trees.
- // We alternate between searcherRefCountA and searcherRefCountB.
- private AtomicInteger searcherRefCount = searcherRefCountA;
-
// Flush and Merge Policies
private final ILSMFlushController flushController;
private final ILSMMergePolicy mergePolicy;
@@ -96,14 +83,12 @@
if (!opTracker.beforeOperation(ctx.getSearchOperationCallback(), ctx.getModificationCallback(), tryOperation)) {
return false;
}
- // It is possible, due to concurrent execution of operations, that an operation will
- // fail. In such a case, simply retry the operation. Refer to the specific LSMIndex code
- // to see exactly why an operation might fail.
try {
lsmIndex.insertUpdateOrDelete(tuple, ctx);
} finally {
threadExit(ctx);
}
+
return true;
}
@@ -116,6 +101,7 @@
return true;
}
+ @Override
public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Flushing LSM-Index: " + lsmIndex);
@@ -136,7 +122,7 @@
}
lsmIndex.resetMutableComponent();
synchronized (diskComponentsSync) {
- lsmIndex.addFlushedComponent(newComponent);
+ lsmIndex.addComponent(newComponent);
mergePolicy.diskComponentAdded(lsmIndex, lsmIndex.getImmutableComponents().size());
}
@@ -144,56 +130,47 @@
flushController.setFlushStatus(lsmIndex, false);
}
+ @Override
public List<ILSMComponent> search(IIndexCursor cursor, ISearchPredicate pred, ILSMIndexOperationContext ctx,
- boolean includeMemComponent) throws HyracksDataException, IndexException {
+ boolean includeMutableComponent) throws HyracksDataException, IndexException {
// If the search doesn't include the in-memory component, then we don't have
// to synchronize with a flush.
- if (includeMemComponent) {
+ if (includeMutableComponent) {
opTracker.beforeOperation(ctx.getSearchOperationCallback(), ctx.getModificationCallback(), false);
}
// Get a snapshot of the current on-disk Trees.
- // If includeMemComponent is true, then no concurrent
+ // If includeMutableComponent is true, then no concurrent
// flush can add another on-disk Tree (due to threadEnter());
- // If includeMemComponent is false, then it is possible that a concurrent
+ // If includeMutableComponent is false, then it is possible that a concurrent
// flush adds another on-disk Tree.
// Since this mode is only used for merging trees, it doesn't really
// matter if the merge excludes the new on-disk Tree.
- List<ILSMComponent> diskComponentSnapshot = new ArrayList<ILSMComponent>();
- AtomicInteger localSearcherRefCount = null;
+ List<ILSMComponent> operationalComponents;
synchronized (diskComponentsSync) {
- diskComponentSnapshot.addAll(lsmIndex.getImmutableComponents());
- localSearcherRefCount = searcherRefCount;
- localSearcherRefCount.incrementAndGet();
+ operationalComponents = lsmIndex.getOperationalComponents(ctx);
+ }
+ for (ILSMComponent c : operationalComponents) {
+ c.threadEnter();
}
- lsmIndex.search(cursor, diskComponentSnapshot, pred, ctx, includeMemComponent, localSearcherRefCount);
- return diskComponentSnapshot;
+ lsmIndex.search(cursor, operationalComponents, pred, ctx, includeMutableComponent);
+ return operationalComponents;
}
+ @Override
public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
IndexException {
- if (!isMerging.compareAndSet(false, true)) {
- throw new LSMMergeInProgressException(
- "Merge already in progress. Only one merge process allowed at a time.");
- }
-
ILSMIOOperation mergeOp = lsmIndex.createMergeOperation(callback);
- if (mergeOp == null) {
- isMerging.set(false);
- }
return mergeOp;
}
+ @Override
public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Merging LSM-Index: " + lsmIndex);
}
- // Point to the current searcher ref count, so we can wait for it later
- // (after we swap the searcher ref count).
- AtomicInteger localSearcherRefCount = searcherRefCount;
-
List<ILSMComponent> mergedComponents = new ArrayList<ILSMComponent>();
ILSMComponent newComponent = null;
try {
@@ -203,7 +180,6 @@
// No merge happened.
if (newComponent == null) {
- isMerging.set(false);
return;
}
@@ -217,49 +193,40 @@
operation.getCallback().afterFinalize(operation, newComponent);
}
- // Remove the old Trees from the list, and add the new merged Tree(s).
- // Also, swap the searchRefCount.
- synchronized (diskComponentsSync) {
- lsmIndex.addMergedComponent(newComponent, mergedComponents);
- // Swap the searcher ref count reference, and reset it to zero.
- if (searcherRefCount == searcherRefCountA) {
- searcherRefCount = searcherRefCountB;
- } else {
- searcherRefCount = searcherRefCountA;
+ // Remove the old components from the list, and add the new merged component(s).
+ try {
+ synchronized (diskComponentsSync) {
+ lsmIndex.subsumeMergedComponents(newComponent, mergedComponents);
}
- searcherRefCount.set(0);
- }
-
- // Wait for all searchers that are still accessing the old on-disk
- // Trees, then perform the final cleanup of the old Trees.
- while (localSearcherRefCount.get() > 0) {
- try {
- Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
- } catch (InterruptedException e) {
- // Propagate the exception to the caller, so that an appropriate
- // cleanup action can be taken.
- throw new HyracksDataException(e);
+ } finally {
+ // Cleanup merged components in case there are no more searchers accessing them.
+ for (ILSMComponent c : mergedComponents) {
+ c.setState(LSMComponentState.DONE_MERGING);
+ if (c.getThreadReferenceCount() == 0) {
+ c.destroy();
+ }
}
}
-
- // Cleanup. At this point we have guaranteed that no searchers are
- // touching the old on-disk Trees (localSearcherRefCount == 0).
- lsmIndex.cleanUpAfterMerge(mergedComponents);
- isMerging.set(false);
}
@Override
- public void closeSearchCursor(AtomicInteger searcherRefCount, boolean includeMemComponent,
+ public void closeSearchCursor(List<ILSMComponent> operationalComponents, boolean includeMutableComponent,
ILSMIndexOperationContext ctx) throws HyracksDataException {
- // If the in-memory Tree was not included in the search, then we don't
- // need to synchronize with a flush.
- if (includeMemComponent) {
+ // TODO: we should not worry about the mutable component.
+ if (includeMutableComponent) {
threadExit(ctx);
}
- // A merge may be waiting on this searcher to finish searching the on-disk components.
- // Decrement the searcherRefCount so that the merge process is able to cleanup any old
- // on-disk components.
- searcherRefCount.decrementAndGet();
+ try {
+ for (ILSMComponent c : operationalComponents) {
+ c.threadExit();
+ }
+ } finally {
+ for (ILSMComponent c : operationalComponents) {
+ if (c.getState() == LSMComponentState.DONE_MERGING && c.getThreadReferenceCount() == 0) {
+ c.destroy();
+ }
+ }
+ }
}
@Override
@@ -270,7 +237,7 @@
// information to mark the tree as valid).
lsmIndex.markAsValid(index);
synchronized (diskComponentsSync) {
- lsmIndex.addFlushedComponent(index);
+ lsmIndex.addComponent(index);
mergePolicy.diskComponentAdded(lsmIndex, lsmIndex.getImmutableComponents().size());
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index f471301..3b66c72 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -16,6 +16,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
import java.util.Comparator;
+import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -25,6 +26,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
@@ -42,7 +44,9 @@
protected AtomicInteger searcherRefCount;
protected ILSMHarness lsmHarness;
protected final ILSMIndexOperationContext opCtx;
-
+
+ protected List<ILSMComponent> operationalComponents;
+
public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx) {
this.opCtx = opCtx;
outputElement = null;
@@ -79,7 +83,7 @@
rangeCursors = null;
if (searcherRefCount != null) {
- lsmHarness.closeSearchCursor(searcherRefCount, includeMemComponent, opCtx);
+ lsmHarness.closeSearchCursor(operationalComponents, includeMemComponent, opCtx);
}
}
@@ -111,7 +115,7 @@
}
rangeCursors = null;
} finally {
- lsmHarness.closeSearchCursor(searcherRefCount, includeMemComponent, opCtx);
+ lsmHarness.closeSearchCursor(operationalComponents, includeMemComponent, opCtx);
}
}
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 1b67406..74deb47 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -17,7 +17,6 @@
import java.io.File;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -62,6 +61,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentState;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
@@ -163,6 +163,30 @@
}
}
+ @Override
+ public List<ILSMComponent> getOperationalComponents(IIndexOperationContext ctx) {
+ List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
+ switch (ctx.getOperation()) {
+ case SEARCH:
+ // TODO: We should add the mutable component at some point.
+ operationalComponents.addAll(immutableComponents);
+ break;
+ case MERGE:
+ // TODO: determining the participating components in a merge should probably the task of the merge policy.
+ if (immutableComponents.size() > 1) {
+ for (ILSMComponent c : immutableComponents) {
+ if (c.negativeCompareAndSet(LSMComponentState.MERGING, LSMComponentState.MERGING)) {
+ operationalComponents.add(c);
+ }
+ }
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
+ }
+ return operationalComponents;
+ }
+
protected LSMInvertedIndexComponent createDiskInvIndexComponent(ILSMComponentFactory factory,
FileReference dictBTreeFileRef, FileReference btreeFileRef, boolean create) throws HyracksDataException,
IndexException {
@@ -315,12 +339,11 @@
@Override
public void search(IIndexCursor cursor, List<ILSMComponent> immutableComponents, ISearchPredicate pred,
- IIndexOperationContext ictx, boolean includemutableComponent, AtomicInteger searcherRefCount)
- throws HyracksDataException, IndexException {
- int numComponents = (includemutableComponent) ? immutableComponents.size() : immutableComponents.size() + 1;
+ IIndexOperationContext ictx, boolean includeMutableComponent) throws HyracksDataException, IndexException {
+ int numComponents = (includeMutableComponent) ? immutableComponents.size() : immutableComponents.size() + 1;
ArrayList<IIndexAccessor> indexAccessors = new ArrayList<IIndexAccessor>(numComponents);
ArrayList<IIndexAccessor> deletedKeysBTreeAccessors = new ArrayList<IIndexAccessor>(numComponents);
- if (includemutableComponent) {
+ if (includeMutableComponent) {
IIndexAccessor invIndexAccessor = mutableComponent.getInvIndex().createAccessor(
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
indexAccessors.add(invIndexAccessor);
@@ -337,28 +360,35 @@
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
deletedKeysBTreeAccessors.add(deletedKeysAccessor);
}
- ICursorInitialState initState = createCursorInitialState(pred, ictx, includemutableComponent, searcherRefCount,
- indexAccessors, deletedKeysBTreeAccessors);
+
+ ICursorInitialState initState = createCursorInitialState(pred, ictx, includeMutableComponent, indexAccessors,
+ deletedKeysBTreeAccessors);
cursor.open(initState, pred);
}
private ICursorInitialState createCursorInitialState(ISearchPredicate pred, IIndexOperationContext ictx,
- boolean includemutableComponent, AtomicInteger searcherRefCount, ArrayList<IIndexAccessor> indexAccessors,
+ boolean includeMutableComponent, ArrayList<IIndexAccessor> indexAccessors,
ArrayList<IIndexAccessor> deletedKeysBTreeAccessors) {
ICursorInitialState initState = null;
PermutingTupleReference keysOnlyTuple = createKeysOnlyTupleReference();
MultiComparator keyCmp = MultiComparator.createIgnoreFieldLength(invListCmpFactories);
+ List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
+ if (includeMutableComponent) {
+ operationalComponents.add(getMutableComponent());
+ }
+ operationalComponents.addAll(immutableComponents);
+
// TODO: This check is not pretty, but it does the job. Come up with something more OO in the future.
// Distinguish between regular searches and range searches (mostly used in merges).
if (pred instanceof InvertedIndexSearchPredicate) {
initState = new LSMInvertedIndexSearchCursorInitialState(keyCmp, keysOnlyTuple, indexAccessors,
- deletedKeysBTreeAccessors, ictx, includemutableComponent, searcherRefCount, lsmHarness);
+ deletedKeysBTreeAccessors, ictx, includeMutableComponent, lsmHarness, operationalComponents);
} else {
InMemoryInvertedIndex memInvIndex = (InMemoryInvertedIndex) mutableComponent.getInvIndex();
MultiComparator tokensAndKeysCmp = MultiComparator.create(memInvIndex.getBTree().getComparatorFactories());
initState = new LSMInvertedIndexRangeSearchCursorInitialState(tokensAndKeysCmp, keyCmp, keysOnlyTuple,
- includemutableComponent, searcherRefCount, lsmHarness, indexAccessors, deletedKeysBTreeAccessors,
- pred);
+ includeMutableComponent, lsmHarness, indexAccessors, deletedKeysBTreeAccessors, pred,
+ operationalComponents);
}
return initState;
}
@@ -412,7 +442,7 @@
public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
IndexException {
LSMInvertedIndexOpContext ctx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- ctx.setOperation(IndexOperation.SEARCH);
+ ctx.setOperation(IndexOperation.MERGE);
IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(ctx);
RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null);
@@ -441,19 +471,6 @@
}
@Override
- public void cleanUpAfterMerge(List<ILSMComponent> mergedComponents) throws HyracksDataException {
- for (ILSMComponent c : mergedComponents) {
- LSMInvertedIndexComponent component = (LSMInvertedIndexComponent) c;
- BTree oldDeletedKeysBTree = component.getDeletedKeysBTree();
- oldDeletedKeysBTree.deactivate();
- oldDeletedKeysBTree.destroy();
- IInvertedIndex oldInvIndex = component.getInvIndex();
- oldInvIndex.deactivate();
- oldInvIndex.destroy();
- }
- }
-
- @Override
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
LSMInvertedIndexFlushOperation flushOp = (LSMInvertedIndexFlushOperation) operation;
@@ -629,4 +646,9 @@
int maxPageId = invIndex.getInvListsMaxPageId();
forceFlushDirtyPages(bufferCache, fileId, startPageId, maxPageId);
}
+
+ @Override
+ public ILSMComponent getMutableComponent() {
+ return mutableComponent;
+ }
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexComponent.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexComponent.java
index ce82317..d9a65b9 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexComponent.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexComponent.java
@@ -15,12 +15,13 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentState;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
-public class LSMInvertedIndexComponent implements ILSMComponent {
+public class LSMInvertedIndexComponent extends AbstractLSMComponent {
private final IInvertedIndex invIndex;
private final BTree deletedKeysBTree;
@@ -31,39 +32,18 @@
}
@Override
- public void activate() {
- // TODO Auto-generated method stub
-
+ public void destroy() throws HyracksDataException {
+ invIndex.deactivate();
+ invIndex.destroy();
+ deletedKeysBTree.deactivate();
+ deletedKeysBTree.destroy();
}
@Override
- public void deactivate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void threadEnter() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void threadExit() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void setState(LSMComponentState state) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public LSMComponentState getState() {
- // TODO Auto-generated method stub
- return null;
+ public void reset() throws HyracksDataException {
+ ((InMemoryFreePageManager) deletedKeysBTree.getFreePageManager()).reset();
+ invIndex.clear();
+ deletedKeysBTree.clear();
}
public IInvertedIndex getInvIndex() {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index 0b9e354..43626f5 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -37,16 +37,16 @@
protected ArrayList<IIndexAccessor> deletedKeysBTreeAccessors;
protected PermutingTupleReference keysOnlyTuple;
protected RangePredicate keySearchPred;
-
+
public LSMInvertedIndexRangeSearchCursor(ILSMIndexOperationContext opCtx) {
super(opCtx);
}
-
+
@Override
public void next() throws HyracksDataException {
super.next();
}
-
+
@Override
public void open(ICursorInitialState initState, ISearchPredicate searchPred) throws IndexException,
HyracksDataException {
@@ -59,7 +59,7 @@
rangeCursors[i] = invIndexAccessor.createRangeSearchCursor();
invIndexAccessor.rangeSearch(rangeCursors[i], lsmInitState.getSearchPredicate());
}
-
+
// For searching the deleted-keys BTrees.
this.keysOnlyTuple = lsmInitState.getKeysOnlyTuple();
deletedKeysBTreeAccessors = lsmInitState.getDeletedKeysBTreeAccessors();
@@ -68,14 +68,14 @@
}
MultiComparator keyCmp = lsmInitState.getKeyComparator();
keySearchPred = new RangePredicate(keysOnlyTuple, keysOnlyTuple, true, true, keyCmp, keyCmp);
-
- searcherRefCount = lsmInitState.getSearcherRefCount();
+
lsmHarness = lsmInitState.getLSMHarness();
includeMemComponent = lsmInitState.getIncludeMemComponent();
+ operationalComponents = lsmInitState.getOperationalComponents();
setPriorityQueueComparator();
initPriorityQueue();
}
-
+
/**
* Check deleted-keys BTrees whether they contain the key in the checkElement's tuple.
*/
@@ -84,7 +84,7 @@
keysOnlyTuple.reset(checkElement.getTuple());
int end = checkElement.getCursorIndex();
for (int i = 0; i < end; i++) {
- deletedKeysBTreeCursor.reset();
+ deletedKeysBTreeCursor.reset();
try {
deletedKeysBTreeAccessors.get(i).search(deletedKeysBTreeCursor, keySearchPred);
if (deletedKeysBTreeCursor.hasNext()) {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursorInitialState.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursorInitialState.java
index 2ed143d..5a81a2e 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursorInitialState.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursorInitialState.java
@@ -16,7 +16,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
@@ -31,7 +32,6 @@
private final MultiComparator tokensAndKeysCmp;
private final MultiComparator keyCmp;
- private final AtomicInteger searcherRefCount;
private final ILSMHarness lsmHarness;
private final ArrayList<IIndexAccessor> indexAccessors;
@@ -40,20 +40,21 @@
private final PermutingTupleReference keysOnlyTuple;
private final boolean includeMemComponent;
+ private final List<ILSMComponent> operationalComponents;
public LSMInvertedIndexRangeSearchCursorInitialState(MultiComparator tokensAndKeysCmp, MultiComparator keyCmp,
- PermutingTupleReference keysOnlyTuple, boolean includeMemComponent, AtomicInteger searcherRefCount,
- ILSMHarness lsmHarness, ArrayList<IIndexAccessor> indexAccessors,
- ArrayList<IIndexAccessor> deletedKeysBTreeAccessors, ISearchPredicate predicate) {
+ PermutingTupleReference keysOnlyTuple, boolean includeMemComponent, ILSMHarness lsmHarness,
+ ArrayList<IIndexAccessor> indexAccessors, ArrayList<IIndexAccessor> deletedKeysBTreeAccessors,
+ ISearchPredicate predicate, List<ILSMComponent> operationalComponents) {
this.tokensAndKeysCmp = tokensAndKeysCmp;
this.keyCmp = keyCmp;
this.keysOnlyTuple = keysOnlyTuple;
- this.searcherRefCount = searcherRefCount;
this.lsmHarness = lsmHarness;
this.indexAccessors = indexAccessors;
this.deletedKeysBTreeAccessors = deletedKeysBTreeAccessors;
this.predicate = predicate;
this.includeMemComponent = includeMemComponent;
+ this.operationalComponents = operationalComponents;
}
public int getNumComponents() {
@@ -69,8 +70,8 @@
public void setPage(ICachedPage page) {
}
- public AtomicInteger getSearcherRefCount() {
- return searcherRefCount;
+ public List<ILSMComponent> getOperationalComponents() {
+ return operationalComponents;
}
public ILSMHarness getLSMHarness() {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
index 0f4777f..b6d3bb7 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -27,6 +26,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
@@ -43,7 +43,6 @@
private boolean tupleConsumed = true;
private ILSMHarness harness;
private boolean includeMemComponent;
- private AtomicInteger searcherRefCount;
private List<IIndexAccessor> indexAccessors;
private ISearchPredicate searchPred;
private ISearchOperationCallback searchCallback;
@@ -54,12 +53,14 @@
private RangePredicate keySearchPred;
private ILSMIndexOperationContext opCtx;
+ private List<ILSMComponent> operationalComponents;
+
@Override
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMInvertedIndexSearchCursorInitialState lsmInitState = (LSMInvertedIndexSearchCursorInitialState) initialState;
harness = lsmInitState.getLSMHarness();
includeMemComponent = lsmInitState.getIncludeMemComponent();
- searcherRefCount = lsmInitState.getSearcherRefCount();
+ operationalComponents = lsmInitState.getOperationalComponents();
indexAccessors = lsmInitState.getIndexAccessors();
opCtx = lsmInitState.getOpContext();
accessorIndex = 0;
@@ -152,7 +153,7 @@
public void close() throws HyracksDataException {
reset();
accessorIndex = -1;
- harness.closeSearchCursor(searcherRefCount, includeMemComponent, opCtx);
+ harness.closeSearchCursor(operationalComponents, includeMemComponent, opCtx);
}
@Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
index 877544c..15fc769 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
@@ -16,7 +16,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
@@ -24,6 +23,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
@@ -31,7 +31,6 @@
public class LSMInvertedIndexSearchCursorInitialState implements ICursorInitialState {
private final boolean includeMemComponent;
- private final AtomicInteger searcherfRefCount;
private final ILSMHarness lsmHarness;
private final List<IIndexAccessor> indexAccessors;
private final List<IIndexAccessor> deletedKeysBTreeAccessors;
@@ -41,16 +40,18 @@
private final MultiComparator keyCmp;
private final PermutingTupleReference keysOnlyTuple;
+ private final List<ILSMComponent> operationalComponents;
+
public LSMInvertedIndexSearchCursorInitialState(final MultiComparator keyCmp,
PermutingTupleReference keysOnlyTuple, List<IIndexAccessor> indexAccessors,
List<IIndexAccessor> deletedKeysBTreeAccessors, IIndexOperationContext ctx, boolean includeMemComponent,
- AtomicInteger searcherfRefCount, ILSMHarness lsmHarness) {
+ ILSMHarness lsmHarness, List<ILSMComponent> operationalComponents) {
this.keyCmp = keyCmp;
this.keysOnlyTuple = keysOnlyTuple;
this.indexAccessors = indexAccessors;
this.deletedKeysBTreeAccessors = deletedKeysBTreeAccessors;
this.includeMemComponent = includeMemComponent;
- this.searcherfRefCount = searcherfRefCount;
+ this.operationalComponents = operationalComponents;
this.lsmHarness = lsmHarness;
this.ctx = (LSMInvertedIndexOpContext) ctx;
this.searchCallback = this.ctx.searchCallback;
@@ -65,12 +66,12 @@
public void setPage(ICachedPage page) {
}
- public List<IIndexAccessor> getIndexAccessors() {
- return indexAccessors;
+ public List<ILSMComponent> getOperationalComponents() {
+ return operationalComponents;
}
- public AtomicInteger getSearcherRefCount() {
- return searcherfRefCount;
+ public List<IIndexAccessor> getIndexAccessors() {
+ return indexAccessors;
}
public boolean getIncludeMemComponent() {
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index f1cab5e..13fc738 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -16,6 +16,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
import java.io.File;
+import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -49,6 +50,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentState;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
@@ -185,6 +187,30 @@
mutableComponent.getBTree().clear();
}
+ @Override
+ public List<ILSMComponent> getOperationalComponents(IIndexOperationContext ctx) {
+ List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
+ switch (ctx.getOperation()) {
+ case SEARCH:
+ // TODO: We should add the mutable component at some point.
+ operationalComponents.addAll(immutableComponents);
+ break;
+ case MERGE:
+ // TODO: determining the participating components in a merge should probably the task of the merge policy.
+ if (immutableComponents.size() > 1) {
+ for (ILSMComponent c : immutableComponents) {
+ if (c.negativeCompareAndSet(LSMComponentState.MERGING, LSMComponentState.MERGING)) {
+ operationalComponents.add(c);
+ }
+ }
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
+ }
+ return operationalComponents;
+ }
+
protected LSMComponentFileReferences getMergeTargetFileName(List<ILSMComponent> mergingDiskComponents)
throws HyracksDataException {
RTree firstTree = ((LSMRTreeComponent) mergingDiskComponents.get(0)).getRTree();
@@ -341,4 +367,9 @@
InMemoryBufferCache memBufferCache = (InMemoryBufferCache) mutableComponent.getRTree().getBufferCache();
return memBufferCache.getNumPages() * memBufferCache.getPageSize();
}
+
+ @Override
+ public ILSMComponent getMutableComponent() {
+ return mutableComponent;
+ }
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index df6bcc8..2019d43 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -15,9 +15,9 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
-import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -39,6 +39,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -148,8 +149,7 @@
@Override
public void search(IIndexCursor cursor, List<ILSMComponent> immutableComponents, ISearchPredicate pred,
- IIndexOperationContext ictx, boolean includeMutableComponent, AtomicInteger searcherRefCount)
- throws HyracksDataException, IndexException {
+ IIndexOperationContext ictx, boolean includeMutableComponent) throws HyracksDataException, IndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
int numDiskComponents = immutableComponents.size();
int numTrees = (includeMutableComponent) ? numDiskComponents + 1 : numDiskComponents;
@@ -175,10 +175,15 @@
diskComponentIx++;
}
+ List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
+ if (includeMutableComponent) {
+ operationalComponents.add(getMutableComponent());
+ }
+ operationalComponents.addAll(immutableComponents);
LSMRTreeCursorInitialState initialState = new LSMRTreeCursorInitialState(numTrees, rtreeLeafFrameFactory,
rtreeInteriorFrameFactory, btreeLeafFrameFactory, ctx.getBTreeMultiComparator(), rTreeAccessors,
- bTreeAccessors, searcherRefCount, includeMutableComponent, lsmHarness, comparatorFields,
- linearizerArray, ctx.searchCallback);
+ bTreeAccessors, includeMutableComponent, lsmHarness, comparatorFields, linearizerArray,
+ ctx.searchCallback, operationalComponents);
cursor.open(initialState, pred);
}
@@ -303,24 +308,12 @@
}
@Override
- public void cleanUpAfterMerge(List<ILSMComponent> mergedComponents) throws HyracksDataException {
- for (ILSMComponent c : mergedComponents) {
- LSMRTreeComponent component = (LSMRTreeComponent) c;
- BTree oldBTree = component.getBTree();
- oldBTree.deactivate();
- oldBTree.destroy();
- RTree oldRTree = component.getRTree();
- oldRTree.deactivate();
- oldRTree.destroy();
- }
- }
-
- @Override
public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
// Renaming order is critical because we use assume ordering when we
// read the file names when we open the tree.
// The RTree should be renamed before the BTree.
ILSMIndexOperationContext ctx = createOpContext();
+ ctx.setOperation(IndexOperation.MERGE);
ITreeIndexCursor cursor;
cursor = new LSMRTreeSortedCursor(ctx, linearizer);
ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
index 4490473..e5af9ee 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
@@ -1,5 +1,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -11,6 +12,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
@@ -45,6 +47,8 @@
protected boolean foundNext;
protected final ILSMIndexOperationContext opCtx;
+ protected List<ILSMComponent> operationalComponents;
+
public LSMRTreeAbstractCursor(ILSMIndexOperationContext opCtx) {
super();
this.opCtx = opCtx;
@@ -57,8 +61,8 @@
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
btreeCmp = lsmInitialState.getBTreeCmp();
- searcherRefCount = lsmInitialState.getSearcherRefCount();
includeMemRTree = lsmInitialState.getIncludeMemComponent();
+ operationalComponents = lsmInitialState.getOperationalComponents();
lsmHarness = lsmInitialState.getLSMHarness();
numberOfTrees = lsmInitialState.getNumberOfTrees();
diskRTreeAccessors = lsmInitialState.getRTreeAccessors();
@@ -101,7 +105,7 @@
rtreeCursors = null;
btreeCursors = null;
} finally {
- lsmHarness.closeSearchCursor(searcherRefCount, includeMemRTree, opCtx);
+ lsmHarness.closeSearchCursor(operationalComponents, includeMemRTree, opCtx);
}
open = false;
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponent.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponent.java
index e2b4db5..4b53055 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponent.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponent.java
@@ -15,12 +15,13 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentState;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMComponent;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
-public class LSMRTreeComponent implements ILSMComponent {
+public class LSMRTreeComponent extends AbstractLSMComponent {
private final RTree rtree;
private final BTree btree;
@@ -31,39 +32,22 @@
}
@Override
- public void activate() {
- // TODO Auto-generated method stub
-
+ public void destroy() throws HyracksDataException {
+ rtree.deactivate();
+ rtree.destroy();
+ if (btree != null) {
+ btree.deactivate();
+ btree.destroy();
+ }
}
@Override
- public void deactivate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void threadEnter() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void threadExit() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void setState(LSMComponentState state) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public LSMComponentState getState() {
- // TODO Auto-generated method stub
- return null;
+ public void reset() throws HyracksDataException {
+ ((InMemoryFreePageManager) rtree.getFreePageManager()).reset();
+ rtree.clear();
+ if (btree != null) {
+ btree.clear();
+ }
}
public RTree getRTree() {
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
index 4f98557..590d5d8 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
@@ -15,7 +15,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
@@ -23,31 +23,33 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
public class LSMRTreeCursorInitialState implements ICursorInitialState {
- private int numberOfTrees;
- private ITreeIndexFrameFactory rtreeInteriorFrameFactory;
- private ITreeIndexFrameFactory rtreeLeafFrameFactory;
- private ITreeIndexFrameFactory btreeLeafFrameFactory;
- private MultiComparator btreeCmp;
- private MultiComparator hilbertCmp;
- private ITreeIndexAccessor[] rTreeAccessors;
- private ITreeIndexAccessor[] bTreeAccessors;
- private AtomicInteger searcherRefCount;
+ private final int numberOfTrees;
+ private final ITreeIndexFrameFactory rtreeInteriorFrameFactory;
+ private final ITreeIndexFrameFactory rtreeLeafFrameFactory;
+ private final ITreeIndexFrameFactory btreeLeafFrameFactory;
+ private final MultiComparator btreeCmp;
+ private final MultiComparator hilbertCmp;
+ private final ITreeIndexAccessor[] rTreeAccessors;
+ private final ITreeIndexAccessor[] bTreeAccessors;
private final boolean includeMemRTree;
private final ILSMHarness lsmHarness;
private final int[] comparatorFields;
private ISearchOperationCallback searchCallback;
+ private final List<ILSMComponent> operationalComponents;
public LSMRTreeCursorInitialState(int numberOfTrees, ITreeIndexFrameFactory rtreeLeafFrameFactory,
ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
MultiComparator btreeCmp, ITreeIndexAccessor[] rTreeAccessors, ITreeIndexAccessor[] bTreeAccessors,
- AtomicInteger searcherRefCount, boolean includeMemRTree, ILSMHarness lsmHarness, int[] comparatorFields,
- IBinaryComparatorFactory[] linearizerArray, ISearchOperationCallback searchCallback) {
+ boolean includeMemRTree, ILSMHarness lsmHarness, int[] comparatorFields,
+ IBinaryComparatorFactory[] linearizerArray, ISearchOperationCallback searchCallback,
+ List<ILSMComponent> operationalComponents) {
this.numberOfTrees = numberOfTrees;
this.rtreeLeafFrameFactory = rtreeLeafFrameFactory;
this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory;
@@ -55,12 +57,12 @@
this.btreeCmp = btreeCmp;
this.rTreeAccessors = rTreeAccessors;
this.bTreeAccessors = bTreeAccessors;
- this.searcherRefCount = searcherRefCount;
this.includeMemRTree = includeMemRTree;
this.lsmHarness = lsmHarness;
this.comparatorFields = comparatorFields;
this.hilbertCmp = MultiComparator.create(linearizerArray);
this.searchCallback = searchCallback;
+ this.operationalComponents = operationalComponents;
}
public MultiComparator getHilbertCmp() {
@@ -100,6 +102,10 @@
public void setPage(ICachedPage page) {
}
+ public List<ILSMComponent> getOperationalComponents() {
+ return operationalComponents;
+ }
+
public ITreeIndexAccessor[] getRTreeAccessors() {
return rTreeAccessors;
}
@@ -112,10 +118,6 @@
return includeMemRTree;
}
- public AtomicInteger getSearcherRefCount() {
- return searcherRefCount;
- }
-
public ILSMHarness getLSMHarness() {
return lsmHarness;
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index 73f5f80..53f467b 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -48,7 +48,7 @@
rtreeCursors = null;
btreeCursors = null;
} finally {
- lsmHarness.closeSearchCursor(searcherRefCount, includeMemRTree, opCtx);
+ lsmHarness.closeSearchCursor(operationalComponents, includeMemRTree, opCtx);
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index 7caff33..9a4db5f 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -30,7 +30,8 @@
private boolean[] depletedRtreeCursors;
private int foundIn = -1;
- public LSMRTreeSortedCursor(ILSMIndexOperationContext opCtx, ILinearizeComparatorFactory linearizer) throws HyracksDataException {
+ public LSMRTreeSortedCursor(ILSMIndexOperationContext opCtx, ILinearizeComparatorFactory linearizer)
+ throws HyracksDataException {
super(opCtx);
this.linearizeCmp = linearizer.createBinaryComparator();
reset();
@@ -56,7 +57,7 @@
}
} finally {
if (open) {
- lsmHarness.closeSearchCursor(searcherRefCount, includeMemRTree, opCtx);
+ lsmHarness.closeSearchCursor(operationalComponents, includeMemRTree, opCtx);
}
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 52eb812..99eed2c 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -15,9 +15,9 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
-import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -39,6 +39,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -145,28 +146,32 @@
@Override
public void search(IIndexCursor cursor, List<ILSMComponent> immutableComponents, ISearchPredicate pred,
- IIndexOperationContext ictx, boolean includemutableComponent, AtomicInteger searcherRefCount)
- throws HyracksDataException, IndexException {
+ IIndexOperationContext ictx, boolean includeMutableComponent) throws HyracksDataException, IndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
LSMRTreeWithAntiMatterTuplesSearchCursor lsmTreeCursor = (LSMRTreeWithAntiMatterTuplesSearchCursor) cursor;
int numDiskRComponents = immutableComponents.size();
LSMRTreeCursorInitialState initialState;
ITreeIndexAccessor[] bTreeAccessors = null;
- if (includemutableComponent) {
+ if (includeMutableComponent) {
// Only in-memory BTree
bTreeAccessors = new ITreeIndexAccessor[1];
bTreeAccessors[0] = ctx.memBTreeAccessor;
}
+ List<ILSMComponent> operationalComponents = new ArrayList<ILSMComponent>();
+ if (includeMutableComponent) {
+ operationalComponents.add(getMutableComponent());
+ }
+ operationalComponents.addAll(immutableComponents);
initialState = new LSMRTreeCursorInitialState(numDiskRComponents, rtreeLeafFrameFactory,
rtreeInteriorFrameFactory, btreeLeafFrameFactory, ctx.getBTreeMultiComparator(), null, bTreeAccessors,
- searcherRefCount, includemutableComponent, lsmHarness, comparatorFields, linearizerArray,
- ctx.searchCallback);
+ includeMutableComponent, lsmHarness, comparatorFields, linearizerArray, ctx.searchCallback,
+ operationalComponents);
lsmTreeCursor.open(initialState, pred);
- if (includemutableComponent) {
+ if (includeMutableComponent) {
// Open cursor of in-memory RTree
ctx.memRTreeAccessor.search(lsmTreeCursor.getMemRTreeCursor(), pred);
}
@@ -309,15 +314,6 @@
}
@Override
- public void cleanUpAfterMerge(List<ILSMComponent> mergedComponents) throws HyracksDataException {
- for (ILSMComponent c : mergedComponents) {
- RTree oldRTree = (RTree) ((LSMRTreeComponent) c).getRTree();
- oldRTree.deactivate();
- oldRTree.destroy();
- }
- }
-
- @Override
public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback) {
return new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, createOpContext());
@@ -405,6 +401,7 @@
@Override
public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
LSMRTreeOpContext ctx = createOpContext();
+ ctx.setOperation(IndexOperation.MERGE);
ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(ctx);
ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
// Ordered scan, ignoring the in-memory RTree.
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index f5277cf..f9dd819 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -48,7 +48,7 @@
public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx) {
super(opCtx);
}
-
+
public void initPriorityQueue() throws HyracksDataException, IndexException {
int pqInitSize = (rangeCursors.length > 0) ? rangeCursors.length : 1;
outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(pqInitSize, pqCmp);
@@ -71,6 +71,7 @@
.getRTreeLeafFrameFactory().createFrame());
}
includeMemComponent = lsmInitialState.getIncludeMemComponent();
+ operationalComponents = lsmInitialState.getOperationalComponents();
if (includeMemComponent) {
memRTreeCursor = new RTreeSearchCursor((IRTreeInteriorFrame) lsmInitialState.getRTreeInteriorFrameFactory()
.createFrame(), (IRTreeLeafFrame) lsmInitialState.getRTreeLeafFrameFactory().createFrame());
@@ -79,7 +80,6 @@
memBTreeAccessor = lsmInitialState.getBTreeAccessors()[0];
btreeRangePredicate = new RangePredicate(null, null, true, true, btreeCmp, btreeCmp);
}
- searcherRefCount = lsmInitialState.getSearcherRefCount();
lsmHarness = lsmInitialState.getLSMHarness();
comparatorFields = lsmInitialState.getComparatorFields();
setPriorityQueueComparator();
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexMultiThreadTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexMultiThreadTest.java
index d1d2188..f739d33 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexMultiThreadTest.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexMultiThreadTest.java
@@ -81,8 +81,8 @@
// 4 batches per thread.
int batchSize = (NUM_OPERATIONS / numThreads) / 4;
- IndexMultiThreadTestDriver driver = new IndexMultiThreadTestDriver(index, workerFactory, fieldSerdes,
- conf.ops, conf.opProbs);
+ IndexMultiThreadTestDriver driver = new IndexMultiThreadTestDriver(index, workerFactory, fieldSerdes, conf.ops,
+ conf.opProbs);
driver.init();
long[] times = driver.run(numThreads, 1, NUM_OPERATIONS, batchSize);
index.validate();
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/AbstractIndexTestWorker.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/AbstractIndexTestWorker.java
index d07ae86..f9ff26a 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/AbstractIndexTestWorker.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/AbstractIndexTestWorker.java
@@ -35,8 +35,7 @@
protected final IIndexAccessor indexAccessor;
- public AbstractIndexTestWorker(DataGenThread dataGen, TestOperationSelector opSelector, IIndex index,
- int numBatches) {
+ public AbstractIndexTestWorker(DataGenThread dataGen, TestOperationSelector opSelector, IIndex index, int numBatches) {
this.dataGen = dataGen;
this.opSelector = opSelector;
this.numBatches = numBatches;