added LSM flush policy interface and dummy/naive impl for hyracks testing

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1565 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 454db50..0a44fb0 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -46,6 +46,7 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
@@ -85,13 +86,14 @@
 
     private boolean isOpen = false;
 
-    public LSMBTree(IBufferCache memBufferCache, IOperationCallback memOpCallback, InMemoryFreePageManager memFreePageManager,
-            ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
-            ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMFileManager fileNameManager,
-            BTreeFactory diskBTreeFactory, BTreeFactory bulkLoadBTreeFactory, IFileMapProvider diskFileMapProvider,
-            int fieldCount, IBinaryComparatorFactory[] cmpFactories) {
-        memBTree = new BTree(memBufferCache, memOpCallback, fieldCount, cmpFactories, memFreePageManager, interiorFrameFactory,
-                insertLeafFrameFactory);
+    public LSMBTree(IBufferCache memBufferCache, IOperationCallback memOpCallback,
+            InMemoryFreePageManager memFreePageManager, ITreeIndexFrameFactory interiorFrameFactory,
+            ITreeIndexFrameFactory insertLeafFrameFactory, ITreeIndexFrameFactory deleteLeafFrameFactory,
+            ILSMFileManager fileNameManager, BTreeFactory diskBTreeFactory, BTreeFactory bulkLoadBTreeFactory,
+            IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
+            ILSMFlushPolicy flushPolicy) {
+        memBTree = new BTree(memBufferCache, memOpCallback, fieldCount, cmpFactories, memFreePageManager,
+                interiorFrameFactory, insertLeafFrameFactory);
         this.memFreePageManager = memFreePageManager;
         this.insertLeafFrameFactory = insertLeafFrameFactory;
         this.deleteLeafFrameFactory = deleteLeafFrameFactory;
@@ -102,7 +104,7 @@
         this.cmpFactories = cmpFactories;
         this.diskBTrees = new LinkedList<Object>();
         this.fileManager = fileNameManager;
-        lsmHarness = new LSMHarness(this);
+        lsmHarness = new LSMHarness(this, flushPolicy);
         componentFinalizer = new TreeIndexComponentFinalizer(diskFileMapProvider);
     }
 
@@ -161,12 +163,12 @@
     public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
             TreeIndexException {
         LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
-        
-        if(ctx.getIndexOp() == IndexOp.PHYSICALDELETE) {
+
+        if (ctx.getIndexOp() == IndexOp.PHYSICALDELETE) {
             ctx.memBTreeAccessor.delete(tuple);
             return true;
         }
-        
+
         ctx.memBTreeAccessor.upsert(tuple);
 
         return true;
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
index f0da04b..8592589 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
@@ -62,7 +63,7 @@
         ILSMFileManager fileNameManager = new LSMTreeFileManager(ioManager, diskFileMapProvider, onDiskDir);
         LSMBTree lsmTree = new LSMBTree(memBufferCache, memOpCallback, memFreePageManager, interiorFrameFactory,
                 insertLeafFrameFactory, deleteLeafFrameFactory, fileNameManager, diskBTreeFactory,
-                bulkLoadBTreeFactory, diskFileMapProvider, typeTraits.length, cmpFactories);
+                bulkLoadBTreeFactory, diskFileMapProvider, typeTraits.length, cmpFactories, SequentialFlushPolicy.INSTANCE);
         return lsmTree;
     }
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
new file mode 100644
index 0000000..82cee54
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
@@ -0,0 +1,6 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+
+public interface ILSMFlushPolicy {
+    public void shouldFlush(ILSMIndex harness);
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
new file mode 100644
index 0000000..da5a8bb
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+
+/**
+ * Methods to be implemented by an LSM index, which are called from {@link LSMHarness}.
+ * The implementations of the methods below should be thread agnostic.
+ * Synchronization of LSM operations like updates/searches/flushes/merges are
+ * done by the {@link LSMHarness}. For example, a flush() implementation should only
+ * create and return the new on-disk component, ignoring the fact that
+ * concurrent searches/updates/merges may be ongoing.
+ */
+public interface ILSMHarness extends IIndex {
+    public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
+            IndexException;
+
+    public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
+            boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException;
+
+    public Object merge(List<Object> mergedComponents) throws HyracksDataException, IndexException;
+
+    public void addMergedComponent(Object newComponent, List<Object> mergedComponents);
+
+    public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException;
+
+    public Object flush() throws HyracksDataException, IndexException;
+
+    public void addFlushedComponent(Object index);
+
+    public InMemoryFreePageManager getInMemoryFreePageManager();
+
+    public void resetInMemoryComponent() throws HyracksDataException;
+
+    public List<Object> getDiskComponents();
+
+    public ILSMComponentFinalizer getComponentFinalizer();
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 6f0f866..6c10d26 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -23,7 +23,7 @@
 
 /**
  * Client handle for performing operations
- * (insert/delete/update/search/diskorderscan/merge/flush) on an {@link ILSMIndex}.
+ * (insert/delete/update/search/diskorderscan/merge/flush) on an {@link ILSMHarness}.
  * An {@link ILSMIndexAccessor} is not thread safe, but different {@link ILSMIndexAccessor}s
  * can concurrently operate on the same {@link ILSMIndex} (i.e., the {@link ILSMIndex} must allow
  * concurrent operations).
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index b3e5b2b..a3d7577 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 /**
@@ -64,9 +65,13 @@
     // We alternate between searcherRefCountA and searcherRefCountB.
     private AtomicInteger searcherRefCount = searcherRefCountA;
 
-    public LSMHarness(ILSMIndex lsmIndex) {
+    // Flush and Merge Policies
+    private final ILSMFlushPolicy flushPolicy;
+
+    public LSMHarness(ILSMIndex lsmIndex, ILSMFlushPolicy flushPolicy) {
         this.lsmIndex = lsmIndex;
         this.threadRefCount = 0;
+        this.flushPolicy = flushPolicy;
         this.flushFlag = false;
     }
 
@@ -85,8 +90,7 @@
 
             // Flush will only be handled by last exiting thread.
             if (flushFlag && threadRefCount == 0) {
-                flush();
-                flushFlag = false;
+                flushPolicy.shouldFlush(lsmIndex);
             }
         }
     }
@@ -139,6 +143,9 @@
         synchronized (diskComponentsSync) {
             lsmIndex.addFlushedComponent(newComponent);
         }
+
+        // Unblock entering threads waiting for the flush
+        flushFlag = false;
     }
 
     public List<Object> search(IIndexCursor cursor, ISearchPredicate pred, IIndexOpContext ctx,
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 8370221..04c7ae5 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -47,6 +47,7 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
@@ -123,7 +124,7 @@
             ITreeIndexFrameFactory btreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
             ILSMFileManager fileManager, RTreeFactory diskRTreeFactory, BTreeFactory diskBTreeFactory,
             IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] rtreeCmpFactories,
-            IBinaryComparatorFactory[] btreeCmpFactories) {
+            IBinaryComparatorFactory[] btreeCmpFactories, ILSMFlushPolicy flushPolicy) {
         RTree memRTree = new RTree(memBufferCache, fieldCount, rtreeCmpFactories, memFreePageManager,
                 rtreeInteriorFrameFactory, rtreeLeafFrameFactory);
         // TODO: Do we need another operation callback here?
@@ -142,7 +143,7 @@
         this.diskRTreeFactory = diskRTreeFactory;
         this.btreeCmpFactories = btreeCmpFactories;
         this.rtreeCmpFactories = rtreeCmpFactories;
-        this.lsmHarness = new LSMHarness(this);
+        this.lsmHarness = new LSMHarness(this, flushPolicy);
         componentFinalizer = new LSMRTreeComponentFinalizer(diskFileMapProvider);
     }
 
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
index 85659c5..d014f55 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.RTreeFactory;
@@ -71,7 +72,7 @@
         LSMRTree lsmTree = new LSMRTree(memBufferCache, memFreePageManager, rtreeInteriorFrameFactory,
                 rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory, fileNameManager,
                 diskRTreeFactory, diskBTreeFactory, diskFileMapProvider, typeTraits.length, rtreeCmpFactories,
-                btreeCmpFactories);
+                btreeCmpFactories, SequentialFlushPolicy.INSTANCE);
         return lsmTree;
     }
 }
diff --git a/pom.xml b/pom.xml
index a032aa7..98c3467 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
             <forkMode>pertest</forkMode>
-            <argLine>-enableassertions -Djava.util.logging.config.file=${user.home}/logging.properties -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n</argLine>
+            <argLine>-enableassertions -Djava.util.logging.config.file=${user.home}/logging.properties -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n -Xms1024m -Xmx1024m</argLine>
         </configuration>
       </plugin>
     </plugins>