added LSM flush policy interface and dummy/naive impl for hyracks testing
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1565 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 454db50..0a44fb0 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -46,6 +46,7 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
@@ -85,13 +86,14 @@
private boolean isOpen = false;
- public LSMBTree(IBufferCache memBufferCache, IOperationCallback memOpCallback, InMemoryFreePageManager memFreePageManager,
- ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
- ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMFileManager fileNameManager,
- BTreeFactory diskBTreeFactory, BTreeFactory bulkLoadBTreeFactory, IFileMapProvider diskFileMapProvider,
- int fieldCount, IBinaryComparatorFactory[] cmpFactories) {
- memBTree = new BTree(memBufferCache, memOpCallback, fieldCount, cmpFactories, memFreePageManager, interiorFrameFactory,
- insertLeafFrameFactory);
+ public LSMBTree(IBufferCache memBufferCache, IOperationCallback memOpCallback,
+ InMemoryFreePageManager memFreePageManager, ITreeIndexFrameFactory interiorFrameFactory,
+ ITreeIndexFrameFactory insertLeafFrameFactory, ITreeIndexFrameFactory deleteLeafFrameFactory,
+ ILSMFileManager fileNameManager, BTreeFactory diskBTreeFactory, BTreeFactory bulkLoadBTreeFactory,
+ IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
+ ILSMFlushPolicy flushPolicy) {
+ memBTree = new BTree(memBufferCache, memOpCallback, fieldCount, cmpFactories, memFreePageManager,
+ interiorFrameFactory, insertLeafFrameFactory);
this.memFreePageManager = memFreePageManager;
this.insertLeafFrameFactory = insertLeafFrameFactory;
this.deleteLeafFrameFactory = deleteLeafFrameFactory;
@@ -102,7 +104,7 @@
this.cmpFactories = cmpFactories;
this.diskBTrees = new LinkedList<Object>();
this.fileManager = fileNameManager;
- lsmHarness = new LSMHarness(this);
+ lsmHarness = new LSMHarness(this, flushPolicy);
componentFinalizer = new TreeIndexComponentFinalizer(diskFileMapProvider);
}
@@ -161,12 +163,12 @@
public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
TreeIndexException {
LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
-
- if(ctx.getIndexOp() == IndexOp.PHYSICALDELETE) {
+
+ if (ctx.getIndexOp() == IndexOp.PHYSICALDELETE) {
ctx.memBTreeAccessor.delete(tuple);
return true;
}
-
+
ctx.memBTreeAccessor.upsert(tuple);
return true;
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
index f0da04b..8592589 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -62,7 +63,7 @@
ILSMFileManager fileNameManager = new LSMTreeFileManager(ioManager, diskFileMapProvider, onDiskDir);
LSMBTree lsmTree = new LSMBTree(memBufferCache, memOpCallback, memFreePageManager, interiorFrameFactory,
insertLeafFrameFactory, deleteLeafFrameFactory, fileNameManager, diskBTreeFactory,
- bulkLoadBTreeFactory, diskFileMapProvider, typeTraits.length, cmpFactories);
+ bulkLoadBTreeFactory, diskFileMapProvider, typeTraits.length, cmpFactories, SequentialFlushPolicy.INSTANCE);
return lsmTree;
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
new file mode 100644
index 0000000..82cee54
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
@@ -0,0 +1,6 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+
+public interface ILSMFlushPolicy {
+ public void shouldFlush(ILSMIndex harness);
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
new file mode 100644
index 0000000..da5a8bb
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+
+/**
+ * Methods to be implemented by an LSM index, which are called from {@link LSMHarness}.
+ * The implementations of the methods below should be thread agnostic.
+ * Synchronization of LSM operations like updates/searches/flushes/merges are
+ * done by the {@link LSMHarness}. For example, a flush() implementation should only
+ * create and return the new on-disk component, ignoring the fact that
+ * concurrent searches/updates/merges may be ongoing.
+ */
+public interface ILSMHarness extends IIndex {
+ public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
+ IndexException;
+
+ public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
+ boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException;
+
+ public Object merge(List<Object> mergedComponents) throws HyracksDataException, IndexException;
+
+ public void addMergedComponent(Object newComponent, List<Object> mergedComponents);
+
+ public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException;
+
+ public Object flush() throws HyracksDataException, IndexException;
+
+ public void addFlushedComponent(Object index);
+
+ public InMemoryFreePageManager getInMemoryFreePageManager();
+
+ public void resetInMemoryComponent() throws HyracksDataException;
+
+ public List<Object> getDiskComponents();
+
+ public ILSMComponentFinalizer getComponentFinalizer();
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 6f0f866..6c10d26 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -23,7 +23,7 @@
/**
* Client handle for performing operations
- * (insert/delete/update/search/diskorderscan/merge/flush) on an {@link ILSMIndex}.
+ * (insert/delete/update/search/diskorderscan/merge/flush) on an {@link ILSMHarness}.
* An {@link ILSMIndexAccessor} is not thread safe, but different {@link ILSMIndexAccessor}s
* can concurrently operate on the same {@link ILSMIndex} (i.e., the {@link ILSMIndex} must allow
* concurrent operations).
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index b3e5b2b..a3d7577 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
/**
@@ -64,9 +65,13 @@
// We alternate between searcherRefCountA and searcherRefCountB.
private AtomicInteger searcherRefCount = searcherRefCountA;
- public LSMHarness(ILSMIndex lsmIndex) {
+ // Flush and Merge Policies
+ private final ILSMFlushPolicy flushPolicy;
+
+ public LSMHarness(ILSMIndex lsmIndex, ILSMFlushPolicy flushPolicy) {
this.lsmIndex = lsmIndex;
this.threadRefCount = 0;
+ this.flushPolicy = flushPolicy;
this.flushFlag = false;
}
@@ -85,8 +90,7 @@
// Flush will only be handled by last exiting thread.
if (flushFlag && threadRefCount == 0) {
- flush();
- flushFlag = false;
+ flushPolicy.shouldFlush(lsmIndex);
}
}
}
@@ -139,6 +143,9 @@
synchronized (diskComponentsSync) {
lsmIndex.addFlushedComponent(newComponent);
}
+
+ // Unblock entering threads waiting for the flush
+ flushFlag = false;
}
public List<Object> search(IIndexCursor cursor, ISearchPredicate pred, IIndexOpContext ctx,
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 8370221..04c7ae5 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -47,6 +47,7 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
@@ -123,7 +124,7 @@
ITreeIndexFrameFactory btreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
ILSMFileManager fileManager, RTreeFactory diskRTreeFactory, BTreeFactory diskBTreeFactory,
IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] rtreeCmpFactories,
- IBinaryComparatorFactory[] btreeCmpFactories) {
+ IBinaryComparatorFactory[] btreeCmpFactories, ILSMFlushPolicy flushPolicy) {
RTree memRTree = new RTree(memBufferCache, fieldCount, rtreeCmpFactories, memFreePageManager,
rtreeInteriorFrameFactory, rtreeLeafFrameFactory);
// TODO: Do we need another operation callback here?
@@ -142,7 +143,7 @@
this.diskRTreeFactory = diskRTreeFactory;
this.btreeCmpFactories = btreeCmpFactories;
this.rtreeCmpFactories = rtreeCmpFactories;
- this.lsmHarness = new LSMHarness(this);
+ this.lsmHarness = new LSMHarness(this, flushPolicy);
componentFinalizer = new LSMRTreeComponentFinalizer(diskFileMapProvider);
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
index 85659c5..d014f55 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.RTreeFactory;
@@ -71,7 +72,7 @@
LSMRTree lsmTree = new LSMRTree(memBufferCache, memFreePageManager, rtreeInteriorFrameFactory,
rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory, fileNameManager,
diskRTreeFactory, diskBTreeFactory, diskFileMapProvider, typeTraits.length, rtreeCmpFactories,
- btreeCmpFactories);
+ btreeCmpFactories, SequentialFlushPolicy.INSTANCE);
return lsmTree;
}
}
diff --git a/pom.xml b/pom.xml
index a032aa7..98c3467 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkMode>pertest</forkMode>
- <argLine>-enableassertions -Djava.util.logging.config.file=${user.home}/logging.properties -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n</argLine>
+ <argLine>-enableassertions -Djava.util.logging.config.file=${user.home}/logging.properties -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n -Xms1024m -Xmx1024m</argLine>
</configuration>
</plugin>
</plugins>