Committed, so Alex can put some changes to avoid conflicts.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1132 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 930fc80..99f082f 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -27,6 +27,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
@@ -291,10 +293,38 @@
TreeIndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
if (ctx.getIndexOp() == IndexOp.INSERT) {
- ctx.memRTreeAccessor.insert(tuple);
+ // Before each insert, we must check whether there exist a killer
+ // tuple in the memBTree. If we find a killer tuple, we must truly
+ // delete the existing tuple from the BTree, and then insert it to
+ // memRTree. Otherwise, the old killer tuple will kill the newly
+ // added RTree tuple.
+ RangePredicate btreeRangePredicate = new RangePredicate(tuple, tuple, true, true,
+ ctx.getBTreeMultiComparator(), ctx.getBTreeMultiComparator());
+ ITreeIndexCursor cursor = ctx.memBTreeAccessor.createSearchCursor();
+ ctx.memBTreeAccessor.search(cursor, btreeRangePredicate);
+ ITupleReference tupleCopy = null;
+ try {
+ if (cursor.hasNext()) {
+ cursor.next();
+ tupleCopy = TupleUtils.copyTuple(cursor.getTuple());
+ }
+ } finally {
+ cursor.close();
+ }
+ if (tupleCopy != null) {
+ ctx.memRTreeAccessor.insert(tupleCopy);
+ } else {
+ ctx.memRTreeAccessor.insert(tuple);
+ }
+
} else {
- // Assert ctx.getIndexOp() == IndexOp.DELETE
- ctx.memBTreeAccessor.insert(tuple);
+
+ try {
+ ctx.memBTreeAccessor.insert(tuple);
+ } catch (BTreeDuplicateKeyException e) {
+ // Do nothing, because one delete tuple is enough to indicate
+ // that all the corresponding insert tuples are deleted
+ }
}
}
@@ -492,4 +522,8 @@
return new LSMRTreeSearchCursor();
}
}
+
+ public IBinaryComparatorFactory[] getComparatorFactories() {
+ return rtreeCmpFactories;
+ }
}