LSM indexes now flush upon deactivation (closing), fixing issue 65

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1742 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index c0fe5af..9df8f17 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -32,7 +32,6 @@
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
@@ -56,10 +55,12 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeOperation;
@@ -168,6 +169,15 @@
             return;
         }
 
+        BlockingIOOperationCallback cb = new BlockingIOOperationCallback();
+        ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        lsmHarness.getIOScheduler().scheduleOperation(accessor.createFlushOperation(cb));
+        try {
+            cb.waitForIO();
+        } catch (InterruptedException e) {
+            throw new HyracksDataException(e);
+        }
+
         for (Object o : diskBTrees) {
             BTree btree = (BTree) o;
             btree.deactivate();
@@ -484,7 +494,7 @@
     }
 
     @Override
-    public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+    public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
             ISearchOperationCallback searchCallback) {
         return new LSMBTreeIndexAccessor(lsmHarness, createOpContext(modificationCallback, searchCallback));
     }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
index 0caef1a..96e0a51 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
@@ -4,13 +4,22 @@
 
 public class BlockingIOOperationCallback implements ILSMIOOperationCallback {
 
+    private boolean notified = false;
+
     @Override
-    public void callback() {
+    public synchronized void callback() {
         this.notifyAll();
+        notified = true;
     }
 
-    public void block() throws InterruptedException {
-        this.wait();
+    public synchronized void waitForIO() throws InterruptedException {
+        if (!notified) {
+            this.wait();
+        }
+    }
+
+    public synchronized void reset() {
+        notified = false;
     }
 
 }
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 076323e..5008b04 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -42,10 +42,12 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
@@ -179,6 +181,16 @@
         }
 
         isOpen = false;
+        
+        BlockingIOOperationCallback cb = new BlockingIOOperationCallback();
+        ILSMIndexAccessor accessor = (ILSMIndexAccessor) createAccessor(NoOpOperationCallback.INSTANCE,
+                NoOpOperationCallback.INSTANCE);
+        lsmHarness.getIOScheduler().scheduleOperation(accessor.createFlushOperation(cb));
+        try {
+            cb.waitForIO();
+        } catch (InterruptedException e) {
+            throw new HyracksDataException(e);
+        }
 
         memComponent.getRTree().deactivate();
         memComponent.getBTree().deactivate();
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 59e74ba..2270868 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -106,6 +106,7 @@
 
     @Override
     public synchronized void deactivate() throws HyracksDataException {
+        super.deactivate();
         for (Object o : diskComponents) {
             LSMRTreeComponent diskComponent = (LSMRTreeComponent) o;
             RTree rtree = diskComponent.getRTree();
@@ -114,7 +115,6 @@
             btree.deactivate();
         }
         diskComponents.clear();
-        super.deactivate();
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index 2859d04..a79edb6 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -37,27 +37,27 @@
 
     @Override
     public void reset() throws HyracksDataException {
-        if (!open) {
-            return;
-        }
-
         depletedRtreeCursors = new boolean[numberOfTrees];
         foundNext = false;
-        for (int i = 0; i < numberOfTrees; i++) {
-            rtreeCursors[i].reset();
-            try {
-                diskRTreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
-            } catch (IndexException e) {
-                throw new HyracksDataException(e);
+        try {
+            for (int i = 0; i < numberOfTrees; i++) {
+                rtreeCursors[i].reset();
+                try {
+                    diskRTreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
+                } catch (IndexException e) {
+                    throw new HyracksDataException(e);
+                }
+                if (rtreeCursors[i].hasNext()) {
+                    rtreeCursors[i].next();
+                } else {
+                    depletedRtreeCursors[i] = true;
+                }
             }
-            if (rtreeCursors[i].hasNext()) {
-                rtreeCursors[i].next();
-            } else {
-                depletedRtreeCursors[i] = true;
+        } finally {
+            if (open) {
+                lsmHarness.closeSearchCursor(searcherRefCount, includeMemRTree);
             }
         }
-
-        lsmHarness.closeSearchCursor(searcherRefCount, includeMemRTree);
     }
 
     @Override
@@ -131,6 +131,21 @@
     @Override
     public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
         super.open(initialState, searchPred);
-        reset();
+
+        depletedRtreeCursors = new boolean[numberOfTrees];
+        foundNext = false;
+        for (int i = 0; i < numberOfTrees; i++) {
+            rtreeCursors[i].reset();
+            try {
+                diskRTreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
+            } catch (IndexException e) {
+                throw new HyracksDataException(e);
+            }
+            if (rtreeCursors[i].hasNext()) {
+                rtreeCursors[i].next();
+            } else {
+                depletedRtreeCursors[i] = true;
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 2f94339..36c5255 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -110,12 +110,12 @@
 
     @Override
     public synchronized void deactivate() throws HyracksDataException {
+        super.deactivate();
         for (Object o : diskComponents) {
             RTree rtree = (RTree) o;
             rtree.deactivate();
         }
         diskComponents.clear();
-        super.deactivate();
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index 1dddfb7..b910686 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -136,6 +136,15 @@
         }
 
     }
+    
+    @Override
+    public void reset() throws HyracksDataException {
+        if (includeMemComponent) {
+            memRTreeCursor.reset();
+            memBTreeCursor.reset();
+        }
+        super.reset();
+    }
 
     @Override
     public void close() throws HyracksDataException {