Started implementing a simple merge.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1045 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java
index 2c837bf..251c5a0 100644
--- a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java
@@ -61,7 +61,6 @@
private final RTreeFactory rTreeFactory;
private final BTreeFactory bTreeFactory;
private final IFileMapManager fileMapManager;
- private int threadReferenceCounter;
private int threadRefCount;
private boolean flushFlag;
@@ -84,7 +83,6 @@
this.inDiskBTreeList = new LinkedList<ITreeIndex>();
this.inDiskTreeCounter = 0;
this.fileMapManager = fileMapManager;
- this.threadReferenceCounter = 0;
this.threadRefCount = 0;
this.created = false;
this.flushFlag = false;
@@ -195,7 +193,106 @@
@Override
public void merge() throws Exception {
- // TODO Auto-generated method stub
+
+ // Cursor setting -- almost the same as search, only difference is
+ // "no cursor for in-memory tree"
+ int numberOfInDiskTrees;
+ synchronized (inDiskRTreeList) {
+ numberOfInDiskTrees = inDiskRTreeList.size();
+ }
+
+ RTreeSearchCursor[] rtreeCursors = new RTreeSearchCursor[numberOfInDiskTrees];
+ BTreeRangeSearchCursor[] btreeCursors = new BTreeRangeSearchCursor[numberOfInDiskTrees];
+
+ for (int i = 0; i < numberOfInDiskTrees; i++) {
+ rtreeCursors[i] = new RTreeSearchCursor((IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
+ (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
+
+ btreeCursors[i] = new BTreeRangeSearchCursor((IBTreeLeafFrame) btreeLeafFrameFactory.createFrame(), false);
+ }
+
+ // Create a new in-Disk RTree
+ // Register the RTree information into system.
+ inDiskTreeCounter++;
+ FileReference rtreeFile = new FileReference(new File(getNextFileName(rtreeFileName, inDiskTreeCounter)));
+ // TODO: Delete the file during cleanup.
+ bufferCache.createFile(rtreeFile);
+ int newDiskRTreeId = fileMapManager.lookupFileId(rtreeFile);
+ // TODO: Close the file during cleanup.
+ bufferCache.openFile(newDiskRTreeId);
+
+ // Create new in-Disk RTree.
+ RTree mergedRTree = (RTree) rTreeFactory.createIndexInstance(newDiskRTreeId);
+ mergedRTree.create(newDiskRTreeId);
+ // TODO: Close the RTree during cleanup.
+ mergedRTree.open(newDiskRTreeId);
+
+
+ // Create a new in-Disk BTree, which have full fillfactor.
+ // Register the BTree information into system.
+ FileReference btreeFile = new FileReference(new File(getNextFileName(btreeFileName, inDiskTreeCounter)));
+ // TODO: Delete the file during cleanup.
+ bufferCache.createFile(btreeFile);
+ int newDiskBTreeId = fileMapManager.lookupFileId(btreeFile);
+ // TODO: Close the file during cleanup.
+ bufferCache.openFile(newDiskBTreeId);
+
+ // Create new in-Disk BTree.
+ BTree mergedBTree = (BTree) bTreeFactory.createIndexInstance(newDiskBTreeId);
+ mergedBTree.create(newDiskBTreeId);
+ // TODO: Close the BTree during cleanup.
+ mergedBTree.open(newDiskBTreeId);
+
+ // BulkLoad the tuples from the RTree into the new disk RTree.
+ IIndexBulkLoadContext rtreeBulkLoadCtx = mergedRTree.beginBulkLoad(1.0f);
+
+ // BulkLoad the tuples from the in-memory tree into the new disk BTree.
+ IIndexBulkLoadContext btreeBulkLoadCtx = mergedBTree.beginBulkLoad(1.0f);
+
+ for (int i = 0; i < numberOfInDiskTrees; i++) {
+
+ // scan the RTrees
+ ITreeIndexCursor rtreeScanCursor = new RTreeSearchCursor(
+ (IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
+ (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
+ SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
+
+ ITreeIndexAccessor onDiskRTreeAccessor = inDiskRTreeList.get(i).createAccessor();
+ onDiskRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
+
+ try {
+ while (rtreeScanCursor.hasNext()) {
+ rtreeScanCursor.next();
+ ITupleReference frameTuple = rtreeScanCursor.getTuple();
+ mergedRTree.bulkLoadAddTuple(frameTuple, rtreeBulkLoadCtx);
+ }
+ } finally {
+ rtreeScanCursor.close();
+ }
+
+
+ // scan the BTrees
+ ITreeIndexCursor btreeScanCursor = new BTreeRangeSearchCursor(
+ (IBTreeLeafFrame) btreeLeafFrameFactory.createFrame(), false);
+ RangePredicate btreeNullPredicate = new RangePredicate(true, null, null, true, true, null, null);
+ ITreeIndexAccessor onDiskBTreeAccessor = inDiskBTreeList.get(i).createAccessor();
+ onDiskBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
+
+ try {
+ while (btreeScanCursor.hasNext()) {
+ btreeScanCursor.next();
+ ITupleReference frameTuple = btreeScanCursor.getTuple();
+ mergedBTree.bulkLoadAddTuple(frameTuple, btreeBulkLoadCtx);
+ }
+ } finally {
+ btreeScanCursor.close();
+ }
+
+ }
+ mergedRTree.endBulkLoad(rtreeBulkLoadCtx);
+ mergedBTree.endBulkLoad(btreeBulkLoadCtx);
+
+ // TODO: complete the merge code
}
@@ -207,10 +304,10 @@
ITreeIndexCursor rtreeScanCursor = new RTreeSearchCursor(
(IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
(IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
- SearchPredicate searchPredicate = new SearchPredicate(null, null);
+ SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
ITreeIndexAccessor memRTreeAccessor = memRTree.createAccessor();
- memRTreeAccessor.search(rtreeScanCursor, searchPredicate);
+ memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
// Create a new in-Disk RTree
@@ -299,21 +396,6 @@
memRTree.create(rtreeFileId);
memBTree.create(btreeFileId);
}
-
- public void decreaseThreadReferenceCounter() throws Exception {
- synchronized (this) {
- threadReferenceCounter--;
- if (flushFlag == true) {
- if (threadReferenceCounter == 0) {
- flush();
- flushFlag = false;
- return;
- } else if (threadReferenceCounter < 0) {
- throw new Error("Thread reference counter is below zero. This indicates a programming error!");
- }
- }
- }
- }
public void threadEnter() {
threadRefCount++;
@@ -379,7 +461,7 @@
while (continuePerformOp == false) {
synchronized (this) {
if (!flushFlag) {
- threadReferenceCounter++;
+ threadRefCount++;
continuePerformOp = true;
}
}
diff --git a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeSearchCursor.java
index 17f2d95..e8f29d4 100644
--- a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeSearchCursor.java
@@ -19,8 +19,8 @@
public class LSMRTreeSearchCursor implements ITreeIndexCursor {
- private BTreeRangeSearchCursor[] btreeCursors;
private RTreeSearchCursor[] rtreeCursors;
+ private BTreeRangeSearchCursor[] btreeCursors;
private BTree.BTreeAccessor memBtreeAccessor;
private ITreeIndexAccessor[] onDiskBTreeAccessors;
private int currentCursror;
@@ -100,15 +100,8 @@
onDiskBTreeAccessors = new ITreeIndexAccessor[numberOfTrees - 1];
for (int i = 0; i < numberOfTrees; i++) {
- if (i < ((LSMRTreeCursorInitialState) initialState).getNumberOfTrees() - 1) { // we
- // already
- // have
- // an
- // accessor
- // for
- // the
- // in-memory
- // b-tree
+ // we already have an accessor for the in-memory b-tree
+ if (i < numberOfTrees - 1) {
onDiskBTreeAccessors[i] = lsmRTree.getInDiskBTreeList().get(i).createAccessor();
}
@@ -131,7 +124,7 @@
@Override
public void close() throws Exception {
- lsmRTree.decreaseThreadReferenceCounter();
+ lsmRTree.threadExit();
for (int i = 0; i < numberOfTrees; i++) {
rtreeCursors[i].close();
btreeCursors[i].close();