- Added LSM-RTree merge operation with correct concurrency managment.
- Bug Fixes and code cleaning.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1083 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
index 4bccba4..b06b7e7 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
@@ -6,6 +6,8 @@
import java.util.concurrent.LinkedBlockingQueue;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
/**
@@ -70,6 +72,18 @@
} else {
return new IntegerFieldValueGenerator(rnd);
}
+ } else if (serde instanceof FloatSerializerDeserializer) {
+ if (sorted) {
+ return new SortedFloatFieldValueGenerator();
+ } else {
+ return new FloatFieldValueGenerator(rnd);
+ }
+ } else if (serde instanceof DoubleSerializerDeserializer) {
+ if (sorted) {
+ return new SortedDoubleFieldValueGenerator();
+ } else {
+ return new DoubleFieldValueGenerator(rnd);
+ }
}
System.out.println("NULL");
//if (serde instanceof Integer64SerializerDeserializer) {
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DoubleFieldValueGenerator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DoubleFieldValueGenerator.java
new file mode 100644
index 0000000..fcac93a
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DoubleFieldValueGenerator.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hyracks.storage.am.common.datagen;
+
+import java.util.Random;
+
+public class DoubleFieldValueGenerator implements IFieldValueGenerator<Double> {
+ protected final Random rnd;
+
+ public DoubleFieldValueGenerator(Random rnd) {
+ this.rnd = rnd;
+ }
+
+ @Override
+ public Double next() {
+ return rnd.nextDouble();
+ }
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/FloatFieldValueGenerator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/FloatFieldValueGenerator.java
new file mode 100644
index 0000000..6f21c77
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/FloatFieldValueGenerator.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hyracks.storage.am.common.datagen;
+
+import java.util.Random;
+
+public class FloatFieldValueGenerator implements IFieldValueGenerator<Float> {
+ protected final Random rnd;
+
+ public FloatFieldValueGenerator(Random rnd) {
+ this.rnd = rnd;
+ }
+
+ @Override
+ public Float next() {
+ return rnd.nextFloat();
+ }
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedDoubleFieldValueGenerator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedDoubleFieldValueGenerator.java
new file mode 100644
index 0000000..4193811
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedDoubleFieldValueGenerator.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hyracks.storage.am.common.datagen;
+
+public class SortedDoubleFieldValueGenerator implements IFieldValueGenerator<Double> {
+ private double val = 0.0d;
+
+ public SortedDoubleFieldValueGenerator() {
+ }
+
+ public SortedDoubleFieldValueGenerator(double startVal) {
+ val = startVal;
+ }
+
+ @Override
+ public Double next() {
+ return val++;
+ }
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedFloatFieldValueGenerator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedFloatFieldValueGenerator.java
new file mode 100644
index 0000000..1f6b315
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/SortedFloatFieldValueGenerator.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hyracks.storage.am.common.datagen;
+
+public class SortedFloatFieldValueGenerator implements IFieldValueGenerator<Float> {
+ private float val = 0.0f;
+
+ public SortedFloatFieldValueGenerator() {
+ }
+
+ public SortedFloatFieldValueGenerator(float startVal) {
+ val = startVal;
+ }
+
+ @Override
+ public Float next() {
+ return val++;
+ }
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IExperimentRunner.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IExperimentRunner.java
new file mode 100644
index 0000000..8b9d6f3
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IExperimentRunner.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
+
+public interface IExperimentRunner {
+ public static int DEFAULT_MAX_OUTSTANDING = 100000;
+
+ public void init() throws Exception;
+
+ public long runExperiment(DataGenThread dataGen, int numThreads) throws Exception;
+
+ public void reset() throws Exception;
+
+ public void deinit() throws Exception;
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeBulkLoadContext.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeBulkLoadContext.java
deleted file mode 100644
index 727c211..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeBulkLoadContext.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
-
-public class LSMTreeBulkLoadContext implements IIndexBulkLoadContext {
- private final ITreeIndex tree;
- private IIndexBulkLoadContext bulkLoadCtx;
-
- public LSMTreeBulkLoadContext(ITreeIndex tree) {
- this.tree = tree;
- }
-
- public void beginBulkLoad(float fillFactor) throws HyracksDataException, TreeIndexException {
- bulkLoadCtx = tree.beginBulkLoad(fillFactor);
- }
-
- public ITreeIndex getTree() {
- return tree;
- }
-
- public IIndexBulkLoadContext getBulkLoadCtx() {
- return bulkLoadCtx;
- }
-}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 7b59f1b..5a9c454 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -2,17 +2,19 @@
import java.io.File;
import java.io.FilenameFilter;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedList;
+import java.util.List;
import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
@@ -28,17 +30,16 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileNameManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeBulkLoadContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeFactory;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
-import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
public class LSMRTree implements ILSMTree {
+ private static final long AFTER_MERGE_CLEANUP_SLEEP = 100;
// In-memory components.
private RTree memRTree;
@@ -53,12 +54,8 @@
private final BTreeFactory diskBTreeFactory;
private final IBufferCache diskBufferCache;
private final IFileMapProvider diskFileMapProvider;
- private LinkedList<RTree> onDiskRTrees = new LinkedList<RTree>();
- private LinkedList<RTree> mergedRTrees = new LinkedList<RTree>();
- private LinkedList<BTree> onDiskBTrees = new LinkedList<BTree>();
- private LinkedList<BTree> mergedBTrees = new LinkedList<BTree>();
- private int onDiskRTreeCount;
- private int onDiskBTreeCount;
+ private LinkedList<RTree> diskRTrees = new LinkedList<RTree>();
+ private LinkedList<BTree> diskBTrees = new LinkedList<BTree>();
// Common for in-memory and on-disk components.
private final ITreeIndexFrameFactory rtreeInteriorFrameFactory;
@@ -71,6 +68,15 @@
private int threadRefCount;
private boolean flushFlag;
+ // For synchronizing searchers with a concurrent merge.
+ private AtomicBoolean isMerging = new AtomicBoolean(false);
+ private AtomicInteger searcherRefCountA = new AtomicInteger(0);
+ private AtomicInteger searcherRefCountB = new AtomicInteger(0);
+ // Represents the current number of searcher threads that are operating on
+ // the unmerged on-disk RTrees and BTrees.
+ // We alternate between searcherRefCountA and searcherRefCountB.
+ private AtomicInteger searcherRefCount = searcherRefCountA;
+
public LSMRTree(IBufferCache memBufferCache, InMemoryFreePageManager memFreePageManager,
ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory rtreeLeafFrameFactory,
ITreeIndexFrameFactory btreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
@@ -93,8 +99,6 @@
this.diskRTreeFactory = diskRTreeFactory;
this.diskBTreeFactory = diskBTreeFactory;
this.btreeCmp = btreeCmp;
- this.onDiskRTreeCount = 0;
- this.onDiskBTreeCount = 0;
this.threadRefCount = 0;
this.flushFlag = false;
this.fileNameManager = fileNameManager;
@@ -103,77 +107,6 @@
}
@Override
- public ITreeIndexAccessor createAccessor() {
- return new LSMRTreeAccessor(this);
- }
-
- @Override
- public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws TreeIndexException, HyracksDataException {
- // Note that by using a flush target file name, we state that the new
- // bulk loaded tree is "newer" than any other merged tree.
-
- String fileName = fileNameManager.getFlushFileName();
- RTree diskRTree = (RTree) createDiskTree(fileName + "-rtree", diskRTreeFactory, true);
- // For each RTree, we require to have a buddy BTree. thus, we create an
- // empty BTree. This can be optimized later.
- createDiskTree(fileName + "-btree", diskBTreeFactory, true);
- LSMTreeBulkLoadContext bulkLoadCtx = new LSMTreeBulkLoadContext(diskRTree);
- bulkLoadCtx.beginBulkLoad(fillFactor);
- return bulkLoadCtx;
- }
-
- @Override
- public void bulkLoadAddTuple(ITupleReference tuple, IIndexBulkLoadContext ictx) throws HyracksDataException {
- LSMTreeBulkLoadContext bulkLoadCtx = (LSMTreeBulkLoadContext) ictx;
- bulkLoadCtx.getTree().bulkLoadAddTuple(tuple, bulkLoadCtx.getBulkLoadCtx());
-
- }
-
- @Override
- public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
- LSMTreeBulkLoadContext bulkLoadCtx = (LSMTreeBulkLoadContext) ictx;
- bulkLoadCtx.getTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());
- onDiskRTrees.addFirst((RTree) bulkLoadCtx.getTree());
-
- }
-
- @Override
- public ITreeIndexFrameFactory getLeafFrameFactory() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public ITreeIndexFrameFactory getInteriorFrameFactory() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public IFreePageManager getFreePageManager() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public int getFieldCount() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public int getRootPageId() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public IndexType getIndexType() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public void create(int indexFileId) throws HyracksDataException {
memRTree.create(rtreeFileId);
memBTree.create(btreeFileId);
@@ -217,210 +150,108 @@
if (rtreeFiles == null || btreeFiles == null) {
return;
}
-
+
Comparator<String> fileNameCmp = fileNameManager.getFileNameComparator();
Arrays.sort(rtreeFiles, fileNameCmp);
for (String fileName : rtreeFiles) {
RTree rtree = (RTree) createDiskTree(fileName, diskRTreeFactory, false);
- onDiskRTrees.add(rtree);
+ diskRTrees.add(rtree);
}
Arrays.sort(btreeFiles, fileNameCmp);
for (String fileName : btreeFiles) {
BTree btree = (BTree) createDiskTree(fileName, diskBTreeFactory, false);
- onDiskBTrees.add(btree);
+ diskBTrees.add(btree);
}
}
@Override
public void close() throws HyracksDataException {
- for (RTree rtree : onDiskRTrees) {
+ for (RTree rtree : diskRTrees) {
diskBufferCache.closeFile(rtree.getFileId());
rtree.close();
}
- for (BTree btree : onDiskBTrees) {
+ for (BTree btree : diskBTrees) {
diskBufferCache.closeFile(btree.getFileId());
btree.close();
}
- onDiskRTrees.clear();
- onDiskBTrees.clear();
- onDiskRTreeCount = 0;
- onDiskBTreeCount = 0;
+ diskRTrees.clear();
+ diskBTrees.clear();
memRTree.close();
memBTree.close();
}
- private ITreeIndex createDiskTree(String fileName, TreeFactory diskTreeFactory, boolean createTree)
- throws HyracksDataException {
- // Register the new tree file.
- FileReference file = new FileReference(new File(fileName));
- // TODO: Delete the file during cleanup.
- diskBufferCache.createFile(file);
- int diskTreeFileId = diskFileMapProvider.lookupFileId(file);
- // TODO: Close the file during cleanup.
- diskBufferCache.openFile(diskTreeFileId);
- // Create new tree instance.
- ITreeIndex diskTree = diskTreeFactory.createIndexInstance(diskTreeFileId);
- if (createTree) {
- diskTree.create(diskTreeFileId);
- }
- // TODO: Close the tree during cleanup.
- diskTree.open(diskTreeFileId);
- return diskTree;
+ @Override
+ public ITreeIndexAccessor createAccessor() {
+ return new LSMRTreeAccessor(this);
}
@Override
- public void merge() throws Exception {
-
- // Cursor setting -- almost the same as search, only difference is
- // "no cursor for in-memory tree"
- int numberOfInDiskTrees = onDiskRTrees.size();
-
- RTreeSearchCursor[] rtreeCursors = new RTreeSearchCursor[numberOfInDiskTrees];
- BTreeRangeSearchCursor[] btreeCursors = new BTreeRangeSearchCursor[numberOfInDiskTrees];
-
- for (int i = 0; i < numberOfInDiskTrees; i++) {
- rtreeCursors[i] = new RTreeSearchCursor((IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
- (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
-
- btreeCursors[i] = new BTreeRangeSearchCursor((IBTreeLeafFrame) btreeLeafFrameFactory.createFrame(), false);
- }
-
- String fileName = fileNameManager.getMergeFileName();
- RTree mergedRTree = (RTree) createDiskTree(fileName + "-rtree", diskRTreeFactory, true);
- BTree mergedBTree = (BTree) createDiskTree(fileName + "-btree", diskBTreeFactory, true);
-
- // BulkLoad the tuples from the trees into the new merged trees.
- IIndexBulkLoadContext rtreeBulkLoadCtx = mergedRTree.beginBulkLoad(1.0f);
- IIndexBulkLoadContext btreeBulkLoadCtx = mergedBTree.beginBulkLoad(1.0f);
-
- for (int i = 0; i < numberOfInDiskTrees; i++) {
-
- // scan the RTrees
- ITreeIndexCursor rtreeScanCursor = new RTreeSearchCursor(
- (IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
- (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
- SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
-
- ITreeIndexAccessor onDiskRTreeAccessor = onDiskRTrees.get(i).createAccessor();
- onDiskRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
-
- try {
- while (rtreeScanCursor.hasNext()) {
- rtreeScanCursor.next();
- ITupleReference frameTuple = rtreeScanCursor.getTuple();
- mergedRTree.bulkLoadAddTuple(frameTuple, rtreeBulkLoadCtx);
- }
- } finally {
- rtreeScanCursor.close();
- }
-
- // scan the BTrees
- ITreeIndexCursor btreeScanCursor = new BTreeRangeSearchCursor(
- (IBTreeLeafFrame) btreeLeafFrameFactory.createFrame(), false);
- RangePredicate btreeNullPredicate = new RangePredicate(true, null, null, true, true, null, null);
- ITreeIndexAccessor onDiskBTreeAccessor = onDiskBTrees.get(i).createAccessor();
- onDiskBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
-
- try {
- while (btreeScanCursor.hasNext()) {
- btreeScanCursor.next();
- ITupleReference frameTuple = btreeScanCursor.getTuple();
- mergedBTree.bulkLoadAddTuple(frameTuple, btreeBulkLoadCtx);
- }
- } finally {
- btreeScanCursor.close();
- }
-
- }
- mergedRTree.endBulkLoad(rtreeBulkLoadCtx);
- mergedBTree.endBulkLoad(btreeBulkLoadCtx);
-
- // TODO: complete the merge code
-
- }
-
- @Override
- public void flush() throws HyracksDataException, TreeIndexException {
-
- // scan the RTree
- ITreeIndexCursor rtreeScanCursor = new RTreeSearchCursor(
- (IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
- (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
- SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
-
- ITreeIndexAccessor memRTreeAccessor = memRTree.createAccessor();
- memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
+ public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws TreeIndexException, HyracksDataException {
+ // Note that by using a flush target file name, we state that the new
+ // bulk loaded tree is "newer" than any other merged tree.
String fileName = fileNameManager.getFlushFileName();
RTree diskRTree = (RTree) createDiskTree(fileName + "-rtree", diskRTreeFactory, true);
-
- // BulkLoad the tuples from the in-memory tree into the new disk RTree.
- IIndexBulkLoadContext rtreeBulkLoadCtx = diskRTree.beginBulkLoad(1.0f);
-
- try {
- while (rtreeScanCursor.hasNext()) {
- rtreeScanCursor.next();
- ITupleReference frameTuple = rtreeScanCursor.getTuple();
- diskRTree.bulkLoadAddTuple(frameTuple, rtreeBulkLoadCtx);
- }
- } finally {
- rtreeScanCursor.close();
- }
- diskRTree.endBulkLoad(rtreeBulkLoadCtx);
-
- // scan the BTree
- ITreeIndexCursor btreeScanCursor = new BTreeRangeSearchCursor(
- (IBTreeLeafFrame) btreeLeafFrameFactory.createFrame(), false);
- RangePredicate btreeNullPredicate = new RangePredicate(true, null, null, true, true, null, null);
- ITreeIndexAccessor memBTreeAccessor = memBTree.createAccessor();
- memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
-
+ // For each RTree, we require to have a buddy BTree. thus, we create an
+ // empty BTree. This can be optimized later.
BTree diskBTree = (BTree) createDiskTree(fileName + "-btree", diskBTreeFactory, true);
+ LSMRTreeBulkLoadContext bulkLoadCtx = new LSMRTreeBulkLoadContext(diskRTree, diskBTree);
+ bulkLoadCtx.beginBulkLoad(fillFactor);
+ return bulkLoadCtx;
+ }
- // BulkLoad the tuples from the in-memory tree into the new disk BTree.
- IIndexBulkLoadContext btreeBulkLoadCtx = diskBTree.beginBulkLoad(1.0f);
- try {
- while (btreeScanCursor.hasNext()) {
- btreeScanCursor.next();
- ITupleReference frameTuple = btreeScanCursor.getTuple();
- diskBTree.bulkLoadAddTuple(frameTuple, btreeBulkLoadCtx);
- }
- } finally {
- btreeScanCursor.close();
+ @Override
+ public void bulkLoadAddTuple(ITupleReference tuple, IIndexBulkLoadContext ictx) throws HyracksDataException {
+ LSMRTreeBulkLoadContext bulkLoadCtx = (LSMRTreeBulkLoadContext) ictx;
+ bulkLoadCtx.getRTree().bulkLoadAddTuple(tuple, bulkLoadCtx.getBulkLoadCtx());
+
+ }
+
+ @Override
+ public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
+ LSMRTreeBulkLoadContext bulkLoadCtx = (LSMRTreeBulkLoadContext) ictx;
+ bulkLoadCtx.getRTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());
+ synchronized (diskRTrees) {
+ diskRTrees.addFirst(bulkLoadCtx.getRTree());
+ diskBTrees.addFirst(bulkLoadCtx.getBTree());
}
- diskBTree.endBulkLoad(btreeBulkLoadCtx);
-
- resetInMemoryTrees();
-
- onDiskRTrees.addFirst(diskRTree);
- onDiskBTrees.addFirst(diskBTree);
-
}
- public void resetInMemoryTrees() throws HyracksDataException {
- memFreePageManager.reset();
- memRTree.create(rtreeFileId);
- memBTree.create(btreeFileId);
+ @Override
+ public ITreeIndexFrameFactory getLeafFrameFactory() {
+ // TODO Auto-generated method stub
+ return null;
}
- public void threadEnter() {
- threadRefCount++;
+ @Override
+ public ITreeIndexFrameFactory getInteriorFrameFactory() {
+ // TODO Auto-generated method stub
+ return null;
}
- public void threadExit() throws HyracksDataException, TreeIndexException {
- synchronized (this) {
- threadRefCount--;
- // Check if we've reached or exceeded the maximum number of pages.
- if (!flushFlag && memFreePageManager.isFull()) {
- flushFlag = true;
- }
- // Flush will only be handled by last exiting thread.
- if (flushFlag && threadRefCount == 0) {
- flush();
- flushFlag = false;
- }
- }
+ @Override
+ public IFreePageManager getFreePageManager() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getFieldCount() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public int getRootPageId() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public IndexType getIndexType() {
+ // TODO Auto-generated method stub
+ return null;
}
private void insert(ITupleReference tuple, LSMRTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
@@ -459,25 +290,50 @@
}
}
- private void search(ITreeIndexCursor cursor, ISearchPredicate rtreeSearchPred, LSMRTreeOpContext ctx,
- boolean includeMemTree) throws Exception {
-
- boolean continuePerformOp = false;
- ctx.reset(IndexOp.SEARCH);
- while (continuePerformOp == false) {
- synchronized (this) {
- if (!flushFlag) {
- threadRefCount++;
- continuePerformOp = true;
+ private Pair<List<RTree>, List<BTree>> search(ITreeIndexCursor cursor, ISearchPredicate rtreeSearchPred,
+ LSMRTreeOpContext ctx, boolean includeMemRTree) throws HyracksDataException, TreeIndexException {
+ // If the search doesn't include the in-memory RTree, then we don't have
+ // to synchronize with a flush.
+ if (includeMemRTree) {
+ boolean waitForFlush = true;
+ do {
+ synchronized (this) {
+ if (!flushFlag) {
+ // The corresponding threadExit() is in
+ // LSMTreeRangeSearchCursor.close().
+ threadEnter();
+ waitForFlush = false;
+ }
}
+ } while (waitForFlush);
+ }
+
+ // Get a snapshot of the current on-disk RTrees and BTrees.
+ // If includeMemRTree is true, then no concurrent
+ // flush can add another on-disk RTree (due to threadEnter());
+ // If includeMemRTree is false, then it is possible that a concurrent
+ // flush adds another on-disk RTree.
+ // Since this mode is only used for merging trees, it doesn't really
+ // matter if the merge excludes the new on-disk RTree.
+ List<RTree> diskRTreesSnapshot = new ArrayList<RTree>();
+ List<BTree> diskBTreesSnapshot = new ArrayList<BTree>();
+ AtomicInteger localSearcherRefCount = null;
+ synchronized (diskRTrees) {
+ diskRTreesSnapshot.addAll(diskRTrees);
+ diskBTreesSnapshot.addAll(diskBTrees);
+ // Only remember the search ref count when performing a merge (i.e.,
+ // includeMemRTree is false).
+ if (!includeMemRTree) {
+ localSearcherRefCount = searcherRefCount;
+ localSearcherRefCount.incrementAndGet();
}
}
- int numDiskTrees = onDiskRTrees.size();
+ int numDiskTrees = diskRTreesSnapshot.size();
ITreeIndexAccessor[] bTreeAccessors;
int diskBTreeIx = 0;
- if (includeMemTree) {
+ if (includeMemRTree) {
bTreeAccessors = new ITreeIndexAccessor[numDiskTrees + 1];
bTreeAccessors[0] = ctx.memBTreeAccessor;
diskBTreeIx++;
@@ -485,7 +341,7 @@
bTreeAccessors = new ITreeIndexAccessor[numDiskTrees];
}
- ListIterator<BTree> diskBTreesIter = onDiskBTrees.listIterator();
+ ListIterator<BTree> diskBTreesIter = diskBTreesSnapshot.listIterator();
while (diskBTreesIter.hasNext()) {
BTree diskBTree = diskBTreesIter.next();
bTreeAccessors[diskBTreeIx] = diskBTree.createAccessor();
@@ -494,11 +350,12 @@
LSMRTreeSearchCursor lsmRTreeCursor = (LSMRTreeSearchCursor) cursor;
LSMRTreeCursorInitialState initialState = new LSMRTreeCursorInitialState(numDiskTrees + 1,
- rtreeLeafFrameFactory, rtreeInteriorFrameFactory, btreeLeafFrameFactory, btreeCmp, bTreeAccessors, this);
+ rtreeLeafFrameFactory, rtreeInteriorFrameFactory, btreeLeafFrameFactory, btreeCmp, bTreeAccessors,
+ this, includeMemRTree, localSearcherRefCount);
lsmRTreeCursor.open(initialState, rtreeSearchPred);
int cursorIx = 1;
- if (includeMemTree) {
+ if (includeMemRTree) {
ctx.memRTreeAccessor.search(((LSMRTreeSearchCursor) lsmRTreeCursor).getCursor(0), rtreeSearchPred);
cursorIx = 1;
} else {
@@ -507,7 +364,7 @@
// Open cursors of on-disk RTrees
ITreeIndexAccessor[] diskRTreeAccessors = new ITreeIndexAccessor[numDiskTrees];
- ListIterator<RTree> diskRTreesIter = onDiskRTrees.listIterator();
+ ListIterator<RTree> diskRTreesIter = diskRTreesSnapshot.listIterator();
int diskRTreeIx = 0;
while (diskRTreesIter.hasNext()) {
@@ -517,11 +374,195 @@
cursorIx++;
diskRTreeIx++;
}
+ return new Pair<List<RTree>, List<BTree>>(diskRTreesSnapshot, diskBTreesSnapshot);
}
- public LinkedList<BTree> getOnDiskBTrees() {
- return onDiskBTrees;
+ private ITreeIndex createDiskTree(String fileName, TreeFactory diskTreeFactory, boolean createTree)
+ throws HyracksDataException {
+ // Register the new tree file.
+ FileReference file = new FileReference(new File(fileName));
+ // TODO: Delete the file during cleanup.
+ diskBufferCache.createFile(file);
+ int diskTreeFileId = diskFileMapProvider.lookupFileId(file);
+ // TODO: Close the file during cleanup.
+ diskBufferCache.openFile(diskTreeFileId);
+ // Create new tree instance.
+ ITreeIndex diskTree = diskTreeFactory.createIndexInstance(diskTreeFileId);
+ if (createTree) {
+ diskTree.create(diskTreeFileId);
+ }
+ // TODO: Close the tree during cleanup.
+ diskTree.open(diskTreeFileId);
+ return diskTree;
+ }
+
+ @Override
+ public void flush() throws HyracksDataException, TreeIndexException {
+
+ // scan the RTree
+ ITreeIndexAccessor memRTreeAccessor = memRTree.createAccessor();
+ ITreeIndexCursor rtreeScanCursor = memRTreeAccessor.createSearchCursor();
+ SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
+ memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
+
+ String fileName = fileNameManager.getFlushFileName();
+ RTree diskRTree = (RTree) createDiskTree(fileName + "-rtree", diskRTreeFactory, true);
+
+ // BulkLoad the tuples from the in-memory tree into the new disk RTree.
+ IIndexBulkLoadContext rtreeBulkLoadCtx = diskRTree.beginBulkLoad(1.0f);
+
+ try {
+ while (rtreeScanCursor.hasNext()) {
+ rtreeScanCursor.next();
+ ITupleReference frameTuple = rtreeScanCursor.getTuple();
+ diskRTree.bulkLoadAddTuple(frameTuple, rtreeBulkLoadCtx);
+ }
+ } finally {
+ rtreeScanCursor.close();
+ }
+ diskRTree.endBulkLoad(rtreeBulkLoadCtx);
+
+ // scan the BTree
+ ITreeIndexAccessor memBTreeAccessor = memBTree.createAccessor();
+ ITreeIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor();
+ RangePredicate btreeNullPredicate = new RangePredicate(true, null, null, true, true, null, null);
+ memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
+
+ BTree diskBTree = (BTree) createDiskTree(fileName + "-btree", diskBTreeFactory, true);
+
+ // BulkLoad the tuples from the in-memory tree into the new disk BTree.
+ IIndexBulkLoadContext btreeBulkLoadCtx = diskBTree.beginBulkLoad(1.0f);
+ try {
+ while (btreeScanCursor.hasNext()) {
+ btreeScanCursor.next();
+ ITupleReference frameTuple = btreeScanCursor.getTuple();
+ diskBTree.bulkLoadAddTuple(frameTuple, btreeBulkLoadCtx);
+ }
+ } finally {
+ btreeScanCursor.close();
+ }
+ diskBTree.endBulkLoad(btreeBulkLoadCtx);
+
+ resetInMemoryTrees();
+
+ synchronized (diskRTrees) {
+ diskRTrees.addFirst(diskRTree);
+ diskBTrees.addFirst(diskBTree);
+ }
+ }
+
+ @Override
+ public void merge() throws HyracksDataException, TreeIndexException {
+ if (isMerging.get()) {
+ throw new TreeIndexException("Merge already in progress in LSM-RTree. Only one concurrent merge allowed.");
+ }
+ isMerging.set(true);
+
+ // Point to the current searcher ref count, so we can wait for it later
+ // (after we swap the searcher ref count).
+ AtomicInteger localSearcherRefCount = searcherRefCount;
+
+ LSMRTreeOpContext ctx = createOpContext();
+ ITreeIndexCursor cursor = new LSMRTreeSearchCursor();
+ SearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+ // Scan the RTrees, ignoring the in-memory RTree.
+ Pair<List<RTree>, List<BTree>> mergingDiskTreesPair = search(cursor, rtreeSearchPred, ctx, false);
+ List<RTree> mergingDiskRTrees = mergingDiskTreesPair.getFirst();
+ List<BTree> mergingDiskBTrees = mergingDiskTreesPair.getSecond();
+
+ // Bulk load the tuples from all on-disk RTrees into the new RTree.
+ String fileName = fileNameManager.getMergeFileName();
+ RTree mergedRTree = (RTree) createDiskTree(fileName + "-rtree", diskRTreeFactory, true);
+ BTree mergedBTree = (BTree) createDiskTree(fileName + "-btree", diskBTreeFactory, true);
+
+ IIndexBulkLoadContext bulkLoadCtx = mergedRTree.beginBulkLoad(1.0f);
+ try {
+ while (cursor.hasNext()) {
+ cursor.next();
+ ITupleReference frameTuple = cursor.getTuple();
+ mergedRTree.bulkLoadAddTuple(frameTuple, bulkLoadCtx);
+ }
+ } finally {
+ cursor.close();
+ }
+ mergedRTree.endBulkLoad(bulkLoadCtx);
+
+ // Remove the old RTrees and BTrees from the list, and add the new
+ // merged RTree and an empty BTree
+ // Also, swap the searchRefCount.
+ synchronized (diskRTrees) {
+ diskRTrees.removeAll(mergingDiskRTrees);
+ diskRTrees.addLast(mergedRTree);
+
+ diskBTrees.removeAll(mergingDiskBTrees);
+ diskBTrees.addLast(mergedBTree);
+ // Swap the searcher ref count reference, and reset it to zero.
+ if (searcherRefCount == searcherRefCountA) {
+ searcherRefCount = searcherRefCountB;
+ } else {
+ searcherRefCount = searcherRefCountA;
+ }
+ searcherRefCount.set(0);
+ }
+
+ // Wait for all searchers that are still accessing the old on-disk
+ // RTrees and BTrees, then perform the final cleanup of the old RTrees
+ // and BTrees.
+ while (localSearcherRefCount.get() != 0) {
+ try {
+ Thread.sleep(AFTER_MERGE_CLEANUP_SLEEP);
+ } catch (InterruptedException e) {
+ // Propagate the exception to the caller, so that an appropriate
+ // cleanup action can be taken.
+ throw new HyracksDataException(e);
+ }
+ }
+
+ // Cleanup. At this point we have guaranteed that no searchers are
+ // touching the old on-disk RTrees and BTrees (localSearcherRefCount ==
+ // 0).
+ for (RTree oldRTree : mergingDiskRTrees) {
+ oldRTree.close();
+ FileReference fileRef = diskFileMapProvider.lookupFileName(oldRTree.getFileId());
+ diskBufferCache.closeFile(oldRTree.getFileId());
+ diskBufferCache.deleteFile(oldRTree.getFileId());
+ fileRef.getFile().delete();
+ }
+ for (BTree oldBTree : mergingDiskBTrees) {
+ oldBTree.close();
+ FileReference fileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
+ diskBufferCache.closeFile(oldBTree.getFileId());
+ diskBufferCache.deleteFile(oldBTree.getFileId());
+ fileRef.getFile().delete();
+ }
+ isMerging.set(false);
+
+ }
+
+ public void resetInMemoryTrees() throws HyracksDataException {
+ memFreePageManager.reset();
+ memRTree.create(rtreeFileId);
+ memBTree.create(btreeFileId);
+ }
+
+ public void threadEnter() {
+ threadRefCount++;
+ }
+
+ public void threadExit() throws HyracksDataException, TreeIndexException {
+ synchronized (this) {
+ threadRefCount--;
+ // Check if we've reached or exceeded the maximum number of pages.
+ if (!flushFlag && memFreePageManager.isFull()) {
+ flushFlag = true;
+ }
+ // Flush will only be handled by last exiting thread.
+ if (flushFlag && threadRefCount == 0) {
+ flush();
+ flushFlag = false;
+ }
+ }
}
private LSMRTreeOpContext createOpContext() {
@@ -582,7 +623,7 @@
public ITreeIndexCursor createDiskOrderScanCursor() {
throw new UnsupportedOperationException("DiskOrderScan not supported by LSM-RTree.");
}
-
+
@Override
public void diskOrderScan(ITreeIndexCursor cursor) throws HyracksDataException {
throw new UnsupportedOperationException("DiskOrderScan not supported by LSM-RTree.");
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoadContext.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoadContext.java
new file mode 100644
index 0000000..8fefd07
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoadContext.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+
+public class LSMRTreeBulkLoadContext implements IIndexBulkLoadContext {
+ private final RTree rtree;
+ private final BTree btree;
+ private IIndexBulkLoadContext bulkLoadCtx;
+
+ public LSMRTreeBulkLoadContext(RTree rtree, BTree btree) {
+ this.rtree = rtree;
+ this.btree = btree;
+ }
+
+ public void beginBulkLoad(float fillFactor) throws HyracksDataException, TreeIndexException {
+ bulkLoadCtx = rtree.beginBulkLoad(fillFactor);
+ }
+
+ public RTree getRTree() {
+ return rtree;
+ }
+
+ public BTree getBTree() {
+ return btree;
+ }
+
+ public IIndexBulkLoadContext getBulkLoadCtx() {
+ return bulkLoadCtx;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
index 7239f3a..aaca86a 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeCursorInitialState.java
@@ -1,5 +1,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+import java.util.concurrent.atomic.AtomicInteger;
+
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
@@ -15,10 +17,13 @@
private MultiComparator btreeCmp;
private LSMRTree lsmRTree;
private ITreeIndexAccessor[] bTreeAccessors;
+ private final boolean includeMemRTree;
+ private final AtomicInteger searcherRefCount;
public LSMRTreeCursorInitialState(int numberOfTrees, ITreeIndexFrameFactory rtreeLeafFrameFactory,
ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
- MultiComparator btreeCmp, ITreeIndexAccessor[] bTreeAccessors, LSMRTree lsmRTree) {
+ MultiComparator btreeCmp, ITreeIndexAccessor[] bTreeAccessors, LSMRTree lsmRTree, boolean includeMemRTree,
+ AtomicInteger searcherRefCount) {
this.numberOfTrees = numberOfTrees;
this.rtreeLeafFrameFactory = rtreeLeafFrameFactory;
this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory;
@@ -26,6 +31,8 @@
this.btreeCmp = btreeCmp;
this.lsmRTree = lsmRTree;
this.bTreeAccessors = bTreeAccessors;
+ this.includeMemRTree = includeMemRTree;
+ this.searcherRefCount = searcherRefCount;
}
public int getNumberOfTrees() {
@@ -65,4 +72,12 @@
return lsmRTree;
}
+ public boolean getIncludeMemRTree() {
+ return includeMemRTree;
+ }
+
+ public AtomicInteger getSearcherRefCount() {
+ return searcherRefCount;
+ }
+
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index 4cf18c4..e2642fb 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -2,6 +2,7 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
@@ -11,7 +12,7 @@
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeOpContext;
-public final class LSMRTreeOpContext {
+public final class LSMRTreeOpContext implements IIndexOpContext {
private RTreeOpContext rtreeOpContext;
private BTreeOpContext btreeOpContext;
@@ -39,4 +40,9 @@
}
}
+ @Override
+ public void reset() {
+
+ }
+
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index 98de361..bf9af21 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -1,5 +1,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+import java.util.concurrent.atomic.AtomicInteger;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
@@ -28,6 +30,8 @@
private LSMRTree lsmRTree;
private RangePredicate btreeRangePredicate;
private ITupleReference frameTuple;
+ private boolean includeMemRTree;
+ private AtomicInteger searcherRefCount;
public LSMRTreeSearchCursor() {
currentCursror = 0;
@@ -86,25 +90,24 @@
@Override
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
-
- lsmRTree = ((LSMRTreeCursorInitialState) initialState).getLsmRTree();
- btreeCmp = ((LSMRTreeCursorInitialState) initialState).getBTreeCmp();
-
- numberOfTrees = ((LSMRTreeCursorInitialState) initialState).getNumberOfTrees();
-
- diskBTreeAccessors = ((LSMRTreeCursorInitialState) initialState).getBTreeAccessors();
+ LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
+ lsmRTree = lsmInitialState.getLsmRTree();
+ btreeCmp = lsmInitialState.getBTreeCmp();
+ includeMemRTree = lsmInitialState.getIncludeMemRTree();
+ searcherRefCount = lsmInitialState.getSearcherRefCount();
+ numberOfTrees = lsmInitialState.getNumberOfTrees();
+ diskBTreeAccessors = lsmInitialState.getBTreeAccessors();
rtreeCursors = new RTreeSearchCursor[numberOfTrees];
btreeCursors = new BTreeRangeSearchCursor[numberOfTrees];
for (int i = 0; i < numberOfTrees; i++) {
- rtreeCursors[i] = new RTreeSearchCursor((IRTreeInteriorFrame) ((LSMRTreeCursorInitialState) initialState)
- .getRTreeInteriorFrameFactory().createFrame(),
- (IRTreeLeafFrame) ((LSMRTreeCursorInitialState) initialState).getRTreeLeafFrameFactory()
- .createFrame());
+ rtreeCursors[i] = new RTreeSearchCursor((IRTreeInteriorFrame) lsmInitialState
+ .getRTreeInteriorFrameFactory().createFrame(), (IRTreeLeafFrame) lsmInitialState
+ .getRTreeLeafFrameFactory().createFrame());
- btreeCursors[i] = new BTreeRangeSearchCursor((IBTreeLeafFrame) ((LSMRTreeCursorInitialState) initialState)
- .getBTreeLeafFrameFactory().createFrame(), false);
+ btreeCursors[i] = new BTreeRangeSearchCursor((IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory()
+ .createFrame(), false);
}
btreeRangePredicate = new RangePredicate(true, null, null, true, true, btreeCmp, btreeCmp);
}
@@ -124,10 +127,17 @@
rtreeCursors = null;
btreeCursors = null;
- try {
- lsmRTree.threadExit();
- } catch (TreeIndexException e) {
- throw new HyracksDataException(e);
+ // If the in-memory RTree was not included in the search, then we don't
+ // need to synchronize with a flush.
+ if (includeMemRTree) {
+ try {
+ lsmRTree.threadExit();
+ } catch (TreeIndexException e) {
+ throw new HyracksDataException(e);
+ }
+ } else {
+ // Synchronize with ongoing merges.
+ searcherRefCount.decrementAndGet();
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/Pair.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/Pair.java
new file mode 100644
index 0000000..46bce8e
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/Pair.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+public class Pair<A, B> {
+ private final A first;
+ private final B second;
+
+ public Pair(A first, B second) {
+ super();
+ this.first = first;
+ this.second = second;
+ }
+
+ public A getFirst() {
+ return first;
+ }
+
+ public B getSecond() {
+ return second;
+ }
+}
\ No newline at end of file