LSM indexes now flush upon deactivation (closing), fixing issue 65
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1742 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index c0fe5af..9df8f17 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -32,7 +32,6 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
@@ -56,10 +55,12 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeOperation;
@@ -168,6 +169,15 @@
return;
}
+ BlockingIOOperationCallback cb = new BlockingIOOperationCallback();
+ ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ lsmHarness.getIOScheduler().scheduleOperation(accessor.createFlushOperation(cb));
+ try {
+ cb.waitForIO();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+
for (Object o : diskBTrees) {
BTree btree = (BTree) o;
btree.deactivate();
@@ -484,7 +494,7 @@
}
@Override
- public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+ public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback) {
return new LSMBTreeIndexAccessor(lsmHarness, createOpContext(modificationCallback, searchCallback));
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
index 0caef1a..96e0a51 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
@@ -4,13 +4,22 @@
public class BlockingIOOperationCallback implements ILSMIOOperationCallback {
+ private boolean notified = false;
+
@Override
- public void callback() {
+ public synchronized void callback() {
this.notifyAll();
+ notified = true;
}
- public void block() throws InterruptedException {
- this.wait();
+ public synchronized void waitForIO() throws InterruptedException {
+ if (!notified) {
+ this.wait();
+ }
+ }
+
+ public synchronized void reset() {
+ notified = false;
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 076323e..5008b04 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -42,10 +42,12 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeFactory;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
@@ -179,6 +181,16 @@
}
isOpen = false;
+
+ BlockingIOOperationCallback cb = new BlockingIOOperationCallback();
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ lsmHarness.getIOScheduler().scheduleOperation(accessor.createFlushOperation(cb));
+ try {
+ cb.waitForIO();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
memComponent.getRTree().deactivate();
memComponent.getBTree().deactivate();
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 59e74ba..2270868 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -106,6 +106,7 @@
@Override
public synchronized void deactivate() throws HyracksDataException {
+ super.deactivate();
for (Object o : diskComponents) {
LSMRTreeComponent diskComponent = (LSMRTreeComponent) o;
RTree rtree = diskComponent.getRTree();
@@ -114,7 +115,6 @@
btree.deactivate();
}
diskComponents.clear();
- super.deactivate();
}
@Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index 2859d04..a79edb6 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -37,27 +37,27 @@
@Override
public void reset() throws HyracksDataException {
- if (!open) {
- return;
- }
-
depletedRtreeCursors = new boolean[numberOfTrees];
foundNext = false;
- for (int i = 0; i < numberOfTrees; i++) {
- rtreeCursors[i].reset();
- try {
- diskRTreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
- } catch (IndexException e) {
- throw new HyracksDataException(e);
+ try {
+ for (int i = 0; i < numberOfTrees; i++) {
+ rtreeCursors[i].reset();
+ try {
+ diskRTreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ if (rtreeCursors[i].hasNext()) {
+ rtreeCursors[i].next();
+ } else {
+ depletedRtreeCursors[i] = true;
+ }
}
- if (rtreeCursors[i].hasNext()) {
- rtreeCursors[i].next();
- } else {
- depletedRtreeCursors[i] = true;
+ } finally {
+ if (open) {
+ lsmHarness.closeSearchCursor(searcherRefCount, includeMemRTree);
}
}
-
- lsmHarness.closeSearchCursor(searcherRefCount, includeMemRTree);
}
@Override
@@ -131,6 +131,21 @@
@Override
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
super.open(initialState, searchPred);
- reset();
+
+ depletedRtreeCursors = new boolean[numberOfTrees];
+ foundNext = false;
+ for (int i = 0; i < numberOfTrees; i++) {
+ rtreeCursors[i].reset();
+ try {
+ diskRTreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ if (rtreeCursors[i].hasNext()) {
+ rtreeCursors[i].next();
+ } else {
+ depletedRtreeCursors[i] = true;
+ }
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 2f94339..36c5255 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -110,12 +110,12 @@
@Override
public synchronized void deactivate() throws HyracksDataException {
+ super.deactivate();
for (Object o : diskComponents) {
RTree rtree = (RTree) o;
rtree.deactivate();
}
diskComponents.clear();
- super.deactivate();
}
@Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index 1dddfb7..b910686 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -136,6 +136,15 @@
}
}
+
+ @Override
+ public void reset() throws HyracksDataException {
+ if (includeMemComponent) {
+ memRTreeCursor.reset();
+ memBTreeCursor.reset();
+ }
+ super.reset();
+ }
@Override
public void close() throws HyracksDataException {