Added flush controller and operation tracker interfaces for LSM indexes
Replaced sequential scheduler with immediate scheduler
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1602 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index 1687af8..8c019dc 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -19,8 +19,9 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialSchedulerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateSchedulerProvider;
import edu.uci.ics.hyracks.tests.am.common.LSMTreeOperatorTestHelper;
public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
@@ -32,9 +33,9 @@
}
public IIndexDataflowHelperFactory createDataFlowHelperFactory() {
- return new LSMBTreeDataflowHelperFactory(
- new ImmediateFlushPolicyProvider(SequentialSchedulerProvider.INSTANCE),
- new ConstantMergePolicyProvider(SequentialSchedulerProvider.INSTANCE, MERGE_THRESHOLD));
+ return new LSMBTreeDataflowHelperFactory(new FlushControllerProvider(), new ConstantMergePolicyProvider(
+ ImmediateSchedulerProvider.INSTANCE, MERGE_THRESHOLD), new RefCountingOperationTrackerProvider(),
+ ImmediateSchedulerProvider.INSTANCE);
}
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreePrimaryIndexScanOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreePrimaryIndexScanOperatorTest.java
index e023feb..d751399 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreePrimaryIndexScanOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreePrimaryIndexScanOperatorTest.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
public class LSMBTreePrimaryIndexScanOperatorTest extends BTreePrimaryIndexScanOperatorTest {
+
protected ITreeIndexOperatorTestHelper createTestHelper() throws HyracksException {
return new LSMBTreeOperatorTestHelper(TestStorageManagerComponentHolder.getIOManager());
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
index f1652a5..3418e0b 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
@@ -20,8 +20,9 @@
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialSchedulerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
import edu.uci.ics.hyracks.tests.am.common.LSMTreeOperatorTestHelper;
@@ -38,8 +39,9 @@
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
IBinaryComparatorFactory[] btreeComparatorFactories) {
return new LSMRTreeDataflowHelperFactory(valueProviderFactories, rtreePolicyType, btreeComparatorFactories,
- new ImmediateFlushPolicyProvider(SequentialSchedulerProvider.INSTANCE),
- new ConstantMergePolicyProvider(SequentialSchedulerProvider.INSTANCE, MERGE_THRESHOLD));
+ new FlushControllerProvider(), new ConstantMergePolicyProvider(ImmediateSchedulerProvider.INSTANCE,
+ MERGE_THRESHOLD), new RefCountingOperationTrackerProvider(),
+ ImmediateSchedulerProvider.INSTANCE);
}
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
index bba4e2b..468c7ef 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
@@ -20,8 +20,9 @@
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialSchedulerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
import edu.uci.ics.hyracks.tests.am.common.LSMTreeOperatorTestHelper;
@@ -29,7 +30,7 @@
public class LSMRTreeWithAntiMatterTuplesOperatorTestHelper extends LSMTreeOperatorTestHelper {
private static final int MERGE_THRESHOLD = 3;
-
+
public LSMRTreeWithAntiMatterTuplesOperatorTestHelper(IOManager ioManager) {
super(ioManager);
}
@@ -38,8 +39,9 @@
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
IBinaryComparatorFactory[] btreeComparatorFactories) {
return new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories, rtreePolicyType,
- btreeComparatorFactories, new ImmediateFlushPolicyProvider(SequentialSchedulerProvider.INSTANCE),
- new ConstantMergePolicyProvider(SequentialSchedulerProvider.INSTANCE, MERGE_THRESHOLD));
+ btreeComparatorFactories, new FlushControllerProvider(), new ConstantMergePolicyProvider(
+ ImmediateSchedulerProvider.INSTANCE, MERGE_THRESHOLD),
+ new RefCountingOperationTrackerProvider(), ImmediateSchedulerProvider.INSTANCE);
}
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
index ded4570..907a2b5 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
@@ -25,8 +25,10 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -38,21 +40,28 @@
private final int memPageSize;
private final int memNumPages;
- private final ILSMFlushPolicy flushPolicy;
+ private final ILSMFlushController flushController;
private final ILSMMergePolicy mergePolicy;
+ private final ILSMOperationTracker opTracker;
+ private final ILSMIOScheduler ioScheduler;
public LSMBTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
- this(opDesc, ctx, partition, DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES, flushPolicy, mergePolicy);
+ ILSMFlushController flushController, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+ ILSMIOScheduler ioScheduler) {
+ this(opDesc, ctx, partition, DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES, flushController, mergePolicy,
+ opTracker, ioScheduler);
}
public LSMBTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- int memPageSize, int memNumPages, ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
+ int memPageSize, int memNumPages, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+ ILSMOperationTracker opTracker, ILSMIOScheduler ioScheduler) {
super(opDesc, ctx, partition);
this.memPageSize = memPageSize;
this.memNumPages = memNumPages;
- this.flushPolicy = flushPolicy;
+ this.flushController = flushController;
this.mergePolicy = mergePolicy;
+ this.opTracker = opTracker;
+ this.ioScheduler = ioScheduler;
}
@Override
@@ -66,9 +75,9 @@
file.delete();
}
InMemoryFreePageManager memFreePageManager = new InMemoryFreePageManager(memNumPages, metaDataFrameFactory);
- return LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager,
- ctx.getIOManager(), file.getFile().getPath(), opDesc.getStorageManager()
- .getBufferCache(ctx), opDesc.getStorageManager().getFileMapProvider(ctx), treeOpDesc
- .getTreeIndexTypeTraits(), treeOpDesc.getTreeIndexComparatorFactories(), flushPolicy, mergePolicy);
+ return LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ctx.getIOManager(), file.getFile()
+ .getPath(), opDesc.getStorageManager().getBufferCache(ctx), opDesc.getStorageManager()
+ .getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(), treeOpDesc
+ .getTreeIndexComparatorFactories(), flushController, mergePolicy, opTracker, ioScheduler);
}
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
index 3cd6ffc..b05543b 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
@@ -19,26 +19,34 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
public class LSMBTreeDataflowHelperFactory implements IIndexDataflowHelperFactory {
private static final long serialVersionUID = 1L;
- private final ILSMFlushPolicyProvider flushPolicyProvider;
+ private final ILSMFlushControllerProvider flushControllerProvider;
private final ILSMMergePolicyProvider mergePolicyProvider;
+ private final ILSMOperationTrackerProvider opTrackerProvider;
+ private final ILSMIOSchedulerProvider ioSchedulerProvider;
- public LSMBTreeDataflowHelperFactory(ILSMFlushPolicyProvider flushPolicyProvider,
- ILSMMergePolicyProvider mergePolicyProvider) {
- this.flushPolicyProvider = flushPolicyProvider;
+ public LSMBTreeDataflowHelperFactory(ILSMFlushControllerProvider flushControllerProvider,
+ ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerProvider,
+ ILSMIOSchedulerProvider ioSchedulerProvider) {
+ this.flushControllerProvider = flushControllerProvider;
this.mergePolicyProvider = mergePolicyProvider;
+ this.opTrackerProvider = opTrackerProvider;
+ this.ioSchedulerProvider = ioSchedulerProvider;
}
@Override
public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition) {
- return new LSMBTreeDataflowHelper(opDesc, ctx, partition, flushPolicyProvider.getFlushPolicy(),
- mergePolicyProvider.getMergePolicy());
+ return new LSMBTreeDataflowHelper(opDesc, ctx, partition, flushControllerProvider.getFlushController(),
+ mergePolicyProvider.getMergePolicy(), opTrackerProvider.getOperationTracker(),
+ ioSchedulerProvider.getIOScheduler());
}
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 8e9bf36..6742734 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -50,9 +50,11 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
@@ -95,8 +97,8 @@
ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMFileManager fileNameManager,
BTreeFactory diskBTreeFactory, BTreeFactory bulkLoadBTreeFactory, IFileMapProvider diskFileMapProvider,
- int fieldCount, IBinaryComparatorFactory[] cmpFactories, ILSMFlushPolicy flushPolicy,
- ILSMMergePolicy mergePolicy) {
+ int fieldCount, IBinaryComparatorFactory[] cmpFactories, ILSMFlushController flushController,
+ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOScheduler ioScheduler) {
memBTree = new BTree(memBufferCache, fieldCount, cmpFactories, memFreePageManager, interiorFrameFactory,
insertLeafFrameFactory);
this.memFreePageManager = memFreePageManager;
@@ -109,7 +111,7 @@
this.cmpFactories = cmpFactories;
this.diskBTrees = new LinkedList<Object>();
this.fileManager = fileNameManager;
- lsmHarness = new LSMHarness(this, flushPolicy, mergePolicy);
+ lsmHarness = new LSMHarness(this, flushController, mergePolicy, opTracker, ioScheduler);
componentFinalizer = new TreeIndexComponentFinalizer(diskFileMapProvider);
}
@@ -348,41 +350,40 @@
return diskBTrees;
}
- @Override
- public ITreeIndexBulkLoader createBulkLoader(float fillLevel)
- throws TreeIndexException {
- return new LSMBTreeBulkLoader(fillLevel);
- }
-
- public class LSMBTreeBulkLoader implements ITreeIndexBulkLoader {
- private final BTree diskBTree;
- private final BTreeBulkLoader bulkLoader;
+ @Override
+ public ITreeIndexBulkLoader createBulkLoader(float fillLevel) throws TreeIndexException {
+ return new LSMBTreeBulkLoader(fillLevel);
+ }
- public LSMBTreeBulkLoader(float fillFactor) throws TreeIndexException {
- try {
- diskBTree = createBulkLoadTarget();
- } catch (HyracksDataException e) {
- throw new TreeIndexException(e);
- }
- bulkLoader = (BTreeBulkLoader) diskBTree.createBulkLoader(0.7f);
- }
+ public class LSMBTreeBulkLoader implements ITreeIndexBulkLoader {
+ private final BTree diskBTree;
+ private final BTreeBulkLoader bulkLoader;
- @Override
- public void add(ITupleReference tuple) throws HyracksDataException {
- bulkLoader.add(tuple);
- }
+ public LSMBTreeBulkLoader(float fillFactor) throws TreeIndexException {
+ try {
+ diskBTree = createBulkLoadTarget();
+ } catch (HyracksDataException e) {
+ throw new TreeIndexException(e);
+ }
+ bulkLoader = (BTreeBulkLoader) diskBTree.createBulkLoader(0.7f);
+ }
- @Override
- public void end() throws HyracksDataException {
- bulkLoader.end();
- lsmHarness.addBulkLoadedComponent(diskBTree);
- }
-
- }
+ @Override
+ public void add(ITupleReference tuple) throws HyracksDataException {
+ bulkLoader.add(tuple);
+ }
- @Deprecated
- private ITreeIndexBulkLoader bulkloader;
-
+ @Override
+ public void end() throws HyracksDataException {
+ bulkLoader.end();
+ lsmHarness.addBulkLoadedComponent(diskBTree);
+ }
+
+ }
+
+ @Deprecated
+ private ITreeIndexBulkLoader bulkloader;
+
@Override
public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws TreeIndexException, HyracksDataException {
bulkloader = createBulkLoader(fillFactor);
@@ -475,4 +476,19 @@
public IBufferCache getBufferCache() {
return diskBufferCache;
}
+
+ @Override
+ public ILSMFlushController getFlushController() {
+ return lsmHarness.getFlushController();
+ }
+
+ @Override
+ public ILSMOperationTracker getOperationTracker() {
+ return lsmHarness.getOperationTracker();
+ }
+
+ @Override
+ public ILSMIOScheduler getIOScheduler() {
+ return lsmHarness.getIOScheduler();
+ }
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
index f91912f..cd3f159 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeUtils.java
@@ -28,8 +28,10 @@
import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeCopyTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
@@ -38,10 +40,11 @@
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
public class LSMBTreeUtils {
- public static LSMBTree createLSMTree(InMemoryBufferCache memBufferCache, InMemoryFreePageManager memFreePageManager,
- IIOManager ioManager, String onDiskDir, IBufferCache diskBufferCache,
- IFileMapProvider diskFileMapProvider, ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
- ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
+ public static LSMBTree createLSMTree(InMemoryBufferCache memBufferCache,
+ InMemoryFreePageManager memFreePageManager, IIOManager ioManager, String onDiskDir,
+ IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] cmpFactories, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+ ILSMOperationTracker opTracker, ILSMIOScheduler ioScheduler) {
LSMBTreeTupleWriterFactory insertTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
cmpFactories.length, false);
LSMBTreeTupleWriterFactory deleteTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
@@ -55,14 +58,15 @@
ITreeIndexMetaDataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
LinkedListFreePageManagerFactory freePageManagerFactory = new LinkedListFreePageManagerFactory(diskBufferCache,
metaFrameFactory);
- BTreeFactory diskBTreeFactory = new BTreeFactory(diskBufferCache, freePageManagerFactory,
- cmpFactories, typeTraits.length, interiorFrameFactory, copyTupleLeafFrameFactory);
- BTreeFactory bulkLoadBTreeFactory = new BTreeFactory(diskBufferCache, freePageManagerFactory,
- cmpFactories, typeTraits.length, interiorFrameFactory, insertLeafFrameFactory);
+ BTreeFactory diskBTreeFactory = new BTreeFactory(diskBufferCache, freePageManagerFactory, cmpFactories,
+ typeTraits.length, interiorFrameFactory, copyTupleLeafFrameFactory);
+ BTreeFactory bulkLoadBTreeFactory = new BTreeFactory(diskBufferCache, freePageManagerFactory, cmpFactories,
+ typeTraits.length, interiorFrameFactory, insertLeafFrameFactory);
ILSMFileManager fileNameManager = new LSMTreeFileManager(ioManager, diskFileMapProvider, onDiskDir);
- LSMBTree lsmTree = new LSMBTree(memBufferCache, memFreePageManager, interiorFrameFactory, insertLeafFrameFactory,
- deleteLeafFrameFactory, fileNameManager, diskBTreeFactory, bulkLoadBTreeFactory,
- diskFileMapProvider, typeTraits.length, cmpFactories, flushPolicy, mergePolicy);
+ LSMBTree lsmTree = new LSMBTree(memBufferCache, memFreePageManager, interiorFrameFactory,
+ insertLeafFrameFactory, deleteLeafFrameFactory, fileNameManager, diskBTreeFactory,
+ bulkLoadBTreeFactory, diskFileMapProvider, typeTraits.length, cmpFactories, flushController, mergePolicy,
+ opTracker, ioScheduler);
return lsmTree;
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushController.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushController.java
new file mode 100644
index 0000000..fb58880
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushController.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+public interface ILSMFlushController {
+ public void setFlushStatus(ILSMIndex index, boolean needsFlush);
+
+ public boolean getFlushStatus(ILSMIndex index);
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushControllerProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushControllerProvider.java
new file mode 100644
index 0000000..d62b8b4
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushControllerProvider.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+public interface ILSMFlushControllerProvider extends Serializable {
+ public ILSMFlushController getFlushController();
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
deleted file mode 100644
index a977911..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.api;
-
-
-public interface ILSMFlushPolicy {
- public void memoryComponentExceededThreshold(ILSMIndex index);
-}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicyProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicyProvider.java
deleted file mode 100644
index 9e48f40..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicyProvider.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.api;
-
-import java.io.Serializable;
-
-public interface ILSMFlushPolicyProvider extends Serializable {
- public ILSMFlushPolicy getFlushPolicy();
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index da5a8bb..0698295 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -60,4 +60,10 @@
public List<Object> getDiskComponents();
public ILSMComponentFinalizer getComponentFinalizer();
+
+ public ILSMFlushController getFlushController();
+
+ public ILSMOperationTracker getOperationTracker();
+
+ public ILSMIOScheduler getIOScheduler();
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOScheduler.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOScheduler.java
index 866e208..6929232 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOScheduler.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOScheduler.java
@@ -4,4 +4,6 @@
public void scheduleFlush(ILSMIndex index);
public void scheduleMerge(ILSMIndex index);
+
+ public void shutdown();
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 0c163eb..f934ed1 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -60,4 +60,10 @@
public List<Object> getDiskComponents();
public ILSMComponentFinalizer getComponentFinalizer();
+
+ public ILSMFlushController getFlushController();
+
+ public ILSMOperationTracker getOperationTracker();
+
+ public ILSMIOScheduler getIOScheduler();
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
new file mode 100644
index 0000000..b845bd0
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+
+public interface ILSMOperationTracker {
+ public void threadEnter(ILSMIndex index);
+
+ public void threadExit(ILSMIndex index);
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerProvider.java
new file mode 100644
index 0000000..7bb638b
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerProvider.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+
+public interface ILSMOperationTrackerProvider extends Serializable {
+ public ILSMOperationTracker getOperationTracker();
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/FlushController.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/FlushController.java
new file mode 100644
index 0000000..6fd18f9
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/FlushController.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class FlushController implements ILSMFlushController {
+
+ private boolean needsFlush = false;
+
+ @Override
+ public void setFlushStatus(ILSMIndex index, boolean needsFlush) {
+ this.needsFlush = needsFlush;
+ }
+
+ @Override
+ public boolean getFlushStatus(ILSMIndex index) {
+ return needsFlush;
+ }
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/FlushControllerProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/FlushControllerProvider.java
new file mode 100644
index 0000000..22807e4
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/FlushControllerProvider.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushControllerProvider;
+
+public class FlushControllerProvider implements ILSMFlushControllerProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ILSMFlushController getFlushController() {
+ return new FlushController();
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicy.java
deleted file mode 100644
index 0fb6545..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicy.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-
-public class ImmediateFlushPolicy implements ILSMFlushPolicy {
-
- private final ILSMIOScheduler ioScheduler;
-
- public ImmediateFlushPolicy(ILSMIOScheduler ioScheduler) {
- this.ioScheduler = ioScheduler;
- }
-
- @Override
- public void memoryComponentExceededThreshold(final ILSMIndex index) {
- // Schedule a flush immediately when the memory component is full
- ioScheduler.scheduleFlush(index);
- }
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicyProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicyProvider.java
deleted file mode 100644
index 8e73c09..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicyProvider.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOSchedulerProvider;
-
-public class ImmediateFlushPolicyProvider implements ILSMFlushPolicyProvider {
-
- private static final long serialVersionUID = 1L;
-
- private final ILSMIOSchedulerProvider schedulerProvider;
-
- public ImmediateFlushPolicyProvider(ILSMIOSchedulerProvider schedulerProvider) {
- this.schedulerProvider = schedulerProvider;
- }
-
- @Override
- public ILSMFlushPolicy getFlushPolicy() {
- return new ImmediateFlushPolicy(schedulerProvider.getIOScheduler());
- }
-
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateScheduler.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateScheduler.java
new file mode 100644
index 0000000..7c76953
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateScheduler.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public enum ImmediateScheduler implements ILSMIOScheduler {
+ INSTANCE;
+
+ @Override
+ public void scheduleFlush(final ILSMIndex index) {
+ try {
+ ((ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE))
+ .flush();
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ } catch (IndexException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void scheduleMerge(final ILSMIndex index) {
+ try {
+ ((ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE))
+ .merge();
+ } catch (LSMMergeInProgressException e) {
+ // Ignore!
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ } catch (IndexException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialSchedulerProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateSchedulerProvider.java
similarity index 70%
rename from hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialSchedulerProvider.java
rename to hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateSchedulerProvider.java
index 83e2136..3341059 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialSchedulerProvider.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateSchedulerProvider.java
@@ -3,12 +3,12 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOSchedulerProvider;
-public enum SequentialSchedulerProvider implements ILSMIOSchedulerProvider {
+public enum ImmediateSchedulerProvider implements ILSMIOSchedulerProvider {
INSTANCE;
@Override
public ILSMIOScheduler getIOScheduler() {
- return SequentialScheduler.INSTANCE;
+ return ImmediateScheduler.INSTANCE;
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 7900b29..42d1277 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -28,9 +28,11 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
/**
* Common code for synchronizing LSM operations like
@@ -51,11 +53,6 @@
// All accesses to the LSM-Tree's on-disk components are synchronized on diskComponentsSync.
private Object diskComponentsSync = new Object();
- // For synchronizing all operations with flushes.
- // Currently, all operations block during a flush.
- private int threadRefCount;
- private boolean flushFlag;
-
// For synchronizing searchers with a concurrent merge.
private AtomicBoolean isMerging = new AtomicBoolean(false);
private AtomicInteger searcherRefCountA = new AtomicInteger(0);
@@ -67,56 +64,30 @@
private AtomicInteger searcherRefCount = searcherRefCountA;
// Flush and Merge Policies
- private final ILSMFlushPolicy flushPolicy;
+ private final ILSMFlushController flushController;
private final ILSMMergePolicy mergePolicy;
+ private final ILSMOperationTracker opTracker;
+ private final ILSMIOScheduler ioScheduler;
- public LSMHarness(ILSMIndex lsmIndex, ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
+ public LSMHarness(ILSMIndex lsmIndex, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+ ILSMOperationTracker opTracker, ILSMIOScheduler ioScheduler) {
this.lsmIndex = lsmIndex;
- this.threadRefCount = 0;
- this.flushPolicy = flushPolicy;
+ this.opTracker = opTracker;
+ this.flushController = flushController;
this.mergePolicy = mergePolicy;
- this.flushFlag = false;
+ this.ioScheduler = ioScheduler;
}
- public void threadEnter() {
- threadRefCount++;
- }
-
- public void threadExit() throws HyracksDataException, IndexException {
- synchronized (this) {
- threadRefCount--;
-
- // Check if we've reached or exceeded the maximum number of pages.
- if (!flushFlag && lsmIndex.getInMemoryFreePageManager().isFull()) {
- flushFlag = true;
- }
-
- // Flush will only be handled by last exiting thread.
- if (flushFlag && threadRefCount == 0) {
- flushPolicy.memoryComponentExceededThreshold(lsmIndex);
- }
+ private void threadExit() {
+ if (!lsmIndex.getFlushController().getFlushStatus(lsmIndex) && lsmIndex.getInMemoryFreePageManager().isFull()) {
+ lsmIndex.getFlushController().setFlushStatus(lsmIndex, true);
}
+ opTracker.threadExit(lsmIndex);
}
public void insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ctx) throws HyracksDataException,
IndexException {
- boolean waitForFlush = true;
- do {
- synchronized (this) {
- // flushFlag may be set to true even though the flush has not occurred yet.
- // If flushFlag is set, then the flush is queued to occur by the last exiting thread.
- // This operation should wait for that flush to occur before proceeding.
- if (!flushFlag) {
- // Increment the threadRefCount in order to block the possibility of a concurrent flush.
- // The corresponding threadExit() call is in LSMTreeRangeSearchCursor.close()
- threadEnter();
-
- // A flush is not pending, so proceed with the operation.
- waitForFlush = false;
- }
- }
- } while (waitForFlush);
-
+ opTracker.threadEnter(lsmIndex);
// It is possible, due to concurrent execution of operations, that an operation will
// fail. In such a case, simply retry the operation. Refer to the specific LSMIndex code
// to see exactly why an operation might fail.
@@ -149,7 +120,7 @@
}
// Unblock entering threads waiting for the flush
- flushFlag = false;
+ flushController.setFlushStatus(lsmIndex, false);
}
public List<Object> search(IIndexCursor cursor, ISearchPredicate pred, IIndexOpContext ctx,
@@ -157,22 +128,7 @@
// If the search doesn't include the in-memory component, then we don't have
// to synchronize with a flush.
if (includeMemComponent) {
- boolean waitForFlush = true;
- do {
- synchronized (this) {
- // flushFlag may be set to true even though the flush has not occurred yet.
- // If flushFlag is set, then the flush is queued to occur by the last exiting thread.
- // This operation should wait for that flush to occur before proceeding.
- if (!flushFlag) {
- // Increment the threadRefCount in order to block the possibility of a concurrent flush.
- // The corresponding threadExit() call is in LSMTreeRangeSearchCursor.close()
- threadEnter();
-
- // A flush is not pending, so proceed with the operation.
- waitForFlush = false;
- }
- }
- } while (waitForFlush);
+ opTracker.threadEnter(lsmIndex);
}
// Get a snapshot of the current on-disk Trees.
@@ -259,11 +215,7 @@
// If the in-memory Tree was not included in the search, then we don't
// need to synchronize with a flush.
if (includeMemComponent) {
- try {
- threadExit();
- } catch (IndexException e) {
- throw new HyracksDataException(e);
- }
+ threadExit();
}
// A merge may be waiting on this searcher to finish searching the on-disk components.
// Decrement the searcherRefCount so that the merge process is able to cleanup any old
@@ -282,4 +234,16 @@
mergePolicy.diskComponentAdded(lsmIndex, lsmIndex.getDiskComponents().size());
}
}
+
+ public ILSMFlushController getFlushController() {
+ return flushController;
+ }
+
+ public ILSMOperationTracker getOperationTracker() {
+ return opTracker;
+ }
+
+ public ILSMIOScheduler getIOScheduler() {
+ return ioScheduler;
+ }
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
new file mode 100644
index 0000000..aeb1227
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public class RefCountingOperationTracker implements ILSMOperationTracker {
+
+ private int threadRefCount = 0;
+
+ @Override
+ public void threadEnter(ILSMIndex index) {
+ boolean waitForFlush = true;
+ do {
+ synchronized (this) {
+ // flushFlag may be set to true even though the flush has not occurred yet.
+ // If flushFlag is set, then the flush is queued to occur by the last exiting thread.
+ // This operation should wait for that flush to occur before proceeding.
+ if (!index.getFlushController().getFlushStatus(index)) {
+ // Increment the threadRefCount in order to block the possibility of a concurrent flush.
+ // The corresponding threadExit() call is in LSMTreeRangeSearchCursor.close()
+ threadRefCount++;
+
+ // A flush is not pending, so proceed with the operation.
+ waitForFlush = false;
+ }
+ }
+ } while (waitForFlush);
+ }
+
+ @Override
+ public void threadExit(ILSMIndex index) {
+ synchronized (this) {
+ threadRefCount--;
+
+ // Flush will only be handled by last exiting thread.
+ if (index.getFlushController().getFlushStatus(index) && threadRefCount == 0) {
+ index.getIOScheduler().scheduleFlush(index);
+ }
+ }
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerProvider.java
new file mode 100644
index 0000000..e32e697
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTrackerProvider.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
+
+public class RefCountingOperationTrackerProvider implements ILSMOperationTrackerProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ILSMOperationTracker getOperationTracker() {
+ return new RefCountingOperationTracker();
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialScheduler.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialScheduler.java
deleted file mode 100644
index 90ef3e3..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialScheduler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-
-public enum SequentialScheduler implements ILSMIOScheduler {
- INSTANCE;
-
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
- @Override
- public void scheduleFlush(final ILSMIndex index) {
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- ((ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE)).flush();
- } catch (HyracksDataException e) {
- e.printStackTrace();
- } catch (IndexException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- @Override
- public void scheduleMerge(final ILSMIndex index) {
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- ((ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE)).merge();
- } catch (LSMMergeInProgressException e) {
- // Ignore!
- } catch (HyracksDataException e) {
- e.printStackTrace();
- } catch (IndexException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
-}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
index d96aea9..c7bd9a5 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
@@ -22,15 +22,16 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeInMemoryBufferCache;
@@ -50,36 +51,35 @@
protected final IBinaryComparatorFactory[] btreeComparatorFactories;
protected final IPrimitiveValueProviderFactory[] valueProviderFactories;
protected final RTreePolicyType rtreePolicyType;
- protected final ILSMFlushPolicy flushPolicy;
+ protected final ILSMFlushController flushController;
protected final ILSMMergePolicy mergePolicy;
+ protected final ILSMOperationTracker opTracker;
+ protected final ILSMIOScheduler ioScheduler;
public AbstractLSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
IBinaryComparatorFactory[] btreeComparatorFactories,
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
- ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
- super(opDesc, ctx, partition);
- memPageSize = DEFAULT_MEM_PAGE_SIZE;
- memNumPages = DEFAULT_MEM_NUM_PAGES;
- this.btreeComparatorFactories = btreeComparatorFactories;
- this.valueProviderFactories = valueProviderFactories;
- this.rtreePolicyType = rtreePolicyType;
- this.flushPolicy = flushPolicy;
- this.mergePolicy = mergePolicy;
+ ILSMFlushController flushController, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+ ILSMIOScheduler ioScheduler) {
+ this(opDesc, ctx, partition, DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES, btreeComparatorFactories,
+ valueProviderFactories, rtreePolicyType, flushController, mergePolicy, opTracker, ioScheduler);
}
- public AbstractLSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists, int memPageSize,
- int memNumPages, IBinaryComparatorFactory[] btreeComparatorFactories,
+ public AbstractLSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+ int memPageSize, int memNumPages, IBinaryComparatorFactory[] btreeComparatorFactories,
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
- ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
+ ILSMFlushController flushController, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+ ILSMIOScheduler ioScheduler) {
super(opDesc, ctx, partition);
this.memPageSize = memPageSize;
this.memNumPages = memNumPages;
this.btreeComparatorFactories = btreeComparatorFactories;
this.valueProviderFactories = valueProviderFactories;
this.rtreePolicyType = rtreePolicyType;
- this.flushPolicy = flushPolicy;
+ this.flushController = flushController;
this.mergePolicy = mergePolicy;
+ this.opTracker = opTracker;
+ this.ioScheduler = ioScheduler;
}
@Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
index 5a09621..cf9ac0a 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelper.java
@@ -20,13 +20,14 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
@@ -38,18 +39,19 @@
public LSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
IBinaryComparatorFactory[] btreeComparatorFactories,
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
- ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
- super(opDesc, ctx, partition, btreeComparatorFactories, valueProviderFactories, rtreePolicyType, flushPolicy,
- mergePolicy);
+ ILSMFlushController flushController, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+ ILSMIOScheduler ioScheduler) {
+ super(opDesc, ctx, partition, btreeComparatorFactories, valueProviderFactories, rtreePolicyType,
+ flushController, mergePolicy, opTracker, ioScheduler);
}
- public LSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists, int memPageSize,
- int memNumPages, IBinaryComparatorFactory[] btreeComparatorFactories,
+ public LSMRTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+ int memPageSize, int memNumPages, IBinaryComparatorFactory[] btreeComparatorFactories,
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
- ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
- super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists, memPageSize, memNumPages,
- btreeComparatorFactories, valueProviderFactories, rtreePolicyType, flushPolicy, mergePolicy);
+ ILSMFlushController flushController, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+ ILSMIOScheduler ioScheduler) {
+ super(opDesc, ctx, partition, memPageSize, memNumPages, btreeComparatorFactories, valueProviderFactories,
+ rtreePolicyType, flushController, mergePolicy, opTracker, ioScheduler);
}
@Override
@@ -61,7 +63,7 @@
try {
return LSMRTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, onDiskDir,
diskBufferCache, diskFileMapProvider, typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, rtreePolicyType, flushPolicy, mergePolicy);
+ valueProviderFactories, rtreePolicyType, flushController, mergePolicy, opTracker, ioScheduler);
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
index 91ff2cb..aab6fcc 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
@@ -21,8 +21,10 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
public class LSMRTreeDataflowHelperFactory implements IIndexDataflowHelperFactory {
@@ -32,24 +34,29 @@
private final IBinaryComparatorFactory[] btreeComparatorFactories;
private final IPrimitiveValueProviderFactory[] valueProviderFactories;
private final RTreePolicyType rtreePolicyType;
- private final ILSMFlushPolicyProvider flushPolicyProvider;
+ private final ILSMFlushControllerProvider flushControllerProvider;
private final ILSMMergePolicyProvider mergePolicyProvider;
+ private final ILSMOperationTrackerProvider opTrackerProvider;
+ private final ILSMIOSchedulerProvider ioSchedulerProvider;
public LSMRTreeDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
- ILSMFlushPolicyProvider flushPolicyProvider,
- ILSMMergePolicyProvider mergePolicyProvider) {
+ ILSMFlushControllerProvider flushControllerProvider, ILSMMergePolicyProvider mergePolicyProvider,
+ ILSMOperationTrackerProvider opTrackerProvider, ILSMIOSchedulerProvider ioSchedulerProvider) {
this.btreeComparatorFactories = btreeComparatorFactories;
this.valueProviderFactories = valueProviderFactories;
this.rtreePolicyType = rtreePolicyType;
- this.flushPolicyProvider = flushPolicyProvider;
+ this.flushControllerProvider = flushControllerProvider;
this.mergePolicyProvider = mergePolicyProvider;
+ this.opTrackerProvider = opTrackerProvider;
+ this.ioSchedulerProvider = ioSchedulerProvider;
}
@Override
public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition) {
return new LSMRTreeDataflowHelper(opDesc, ctx, partition, btreeComparatorFactories, valueProviderFactories,
- rtreePolicyType, flushPolicyProvider.getFlushPolicy(), mergePolicyProvider.getMergePolicy());
+ rtreePolicyType, flushControllerProvider.getFlushController(), mergePolicyProvider.getMergePolicy(),
+ opTrackerProvider.getOperationTracker(), ioSchedulerProvider.getIOScheduler());
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelper.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelper.java
index f8b9dfa..1371161 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelper.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelper.java
@@ -20,13 +20,14 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
@@ -37,17 +38,19 @@
public LSMRTreeWithAntiMatterTuplesDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, IBinaryComparatorFactory[] btreeComparatorFactories,
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
- ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
- super(opDesc, ctx, partition, btreeComparatorFactories, valueProviderFactories, rtreePolicyType, flushPolicy, mergePolicy);
+ ILSMFlushController flushController, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+ ILSMIOScheduler ioScheduler) {
+ super(opDesc, ctx, partition, btreeComparatorFactories, valueProviderFactories, rtreePolicyType,
+ flushController, mergePolicy, opTracker, ioScheduler);
}
public LSMRTreeWithAntiMatterTuplesDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- IOperationCallbackProvider opCallbackProvider, int partition, boolean createIfNotExists, int memPageSize,
- int memNumPages, IBinaryComparatorFactory[] btreeComparatorFactories,
+ int partition, int memPageSize, int memNumPages, IBinaryComparatorFactory[] btreeComparatorFactories,
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
- ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
- super(opDesc, ctx, opCallbackProvider, partition, createIfNotExists, memPageSize, memNumPages,
- btreeComparatorFactories, valueProviderFactories, rtreePolicyType, flushPolicy, mergePolicy);
+ ILSMFlushController flushController, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+ ILSMIOScheduler ioScheduler) {
+ super(opDesc, ctx, partition, memPageSize, memNumPages, btreeComparatorFactories, valueProviderFactories,
+ rtreePolicyType, flushController, mergePolicy, opTracker, ioScheduler);
}
@Override
@@ -59,7 +62,7 @@
try {
return LSMRTreeUtils.createLSMTreeWithAntiMatterTuples(memBufferCache, memFreePageManager, ioManager,
onDiskDir, diskBufferCache, diskFileMapProvider, typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, rtreePolicyType, flushPolicy, mergePolicy);
+ valueProviderFactories, rtreePolicyType, flushController, mergePolicy, opTracker, ioScheduler);
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
index 5fc49a9..af6b962 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
@@ -21,8 +21,10 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
public class LSMRTreeWithAntiMatterTuplesDataflowHelperFactory implements IIndexDataflowHelperFactory {
@@ -32,24 +34,30 @@
private final IBinaryComparatorFactory[] btreeComparatorFactories;
private final IPrimitiveValueProviderFactory[] valueProviderFactories;
private final RTreePolicyType rtreePolicyType;
- private final ILSMFlushPolicyProvider flushPolicyProvider;
+ private final ILSMFlushControllerProvider flushControllerProvider;
private final ILSMMergePolicyProvider mergePolicyProvider;
+ private final ILSMOperationTrackerProvider opTrackerProvider;
+ private final ILSMIOSchedulerProvider ioSchedulerProvider;
public LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
- ILSMFlushPolicyProvider flushPolicyProvider,
- ILSMMergePolicyProvider mergePolicyProvider) {
+ ILSMFlushControllerProvider flushControllerProvider, ILSMMergePolicyProvider mergePolicyProvider,
+ ILSMOperationTrackerProvider opTrackerProvider, ILSMIOSchedulerProvider ioSchedulerProvider) {
this.btreeComparatorFactories = btreeComparatorFactories;
this.valueProviderFactories = valueProviderFactories;
this.rtreePolicyType = rtreePolicyType;
- this.flushPolicyProvider = flushPolicyProvider;
+ this.flushControllerProvider = flushControllerProvider;
this.mergePolicyProvider = mergePolicyProvider;
+ this.ioSchedulerProvider = ioSchedulerProvider;
+ this.opTrackerProvider = opTrackerProvider;
}
@Override
public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition) {
return new LSMRTreeWithAntiMatterTuplesDataflowHelper(opDesc, ctx, partition, btreeComparatorFactories,
- valueProviderFactories, rtreePolicyType, flushPolicyProvider.getFlushPolicy(), mergePolicyProvider.getMergePolicy());
+ valueProviderFactories, rtreePolicyType, flushControllerProvider.getFlushController(),
+ mergePolicyProvider.getMergePolicy(), opTrackerProvider.getOperationTracker(),
+ ioSchedulerProvider.getIOScheduler());
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 7e61d2e..807548c 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -40,9 +40,11 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeFactory;
@@ -117,12 +119,13 @@
ILSMFileManager fileManager, RTreeFactory diskRTreeFactory, IFileMapProvider diskFileMapProvider,
ILSMComponentFinalizer componentFinalizer, int fieldCount, IBinaryComparatorFactory[] rtreeCmpFactories,
IBinaryComparatorFactory[] btreeCmpFactories, ILinearizeComparatorFactory linearizer,
- int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
+ int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMFlushController flushController,
+ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOScheduler ioScheduler) {
RTree memRTree = new RTree(memBufferCache, fieldCount, rtreeCmpFactories, memFreePageManager,
rtreeInteriorFrameFactory, rtreeLeafFrameFactory);
// TODO: Do we need another operation callback here?
- BTree memBTree = new BTree(memBufferCache, fieldCount, btreeCmpFactories,
- memFreePageManager, btreeInteriorFrameFactory, btreeLeafFrameFactory);
+ BTree memBTree = new BTree(memBufferCache, fieldCount, btreeCmpFactories, memFreePageManager,
+ btreeInteriorFrameFactory, btreeLeafFrameFactory);
memComponent = new LSMRTreeComponent(memRTree, memBTree);
this.memFreePageManager = memFreePageManager;
this.diskBufferCache = diskRTreeFactory.getBufferCache();
@@ -135,7 +138,7 @@
this.diskRTreeFactory = diskRTreeFactory;
this.btreeCmpFactories = btreeCmpFactories;
this.rtreeCmpFactories = rtreeCmpFactories;
- this.lsmHarness = new LSMHarness(this, flushPolicy, mergePolicy);
+ this.lsmHarness = new LSMHarness(this, flushController, mergePolicy, opTracker, ioScheduler);
this.componentFinalizer = componentFinalizer;
this.linearizer = linearizer;
this.comparatorFields = comparatorFields;
@@ -342,4 +345,18 @@
return componentFinalizer;
}
+ @Override
+ public ILSMFlushController getFlushController() {
+ return lsmHarness.getFlushController();
+ }
+
+ @Override
+ public ILSMOperationTracker getOperationTracker() {
+ return lsmHarness.getOperationTracker();
+ }
+
+ @Override
+ public ILSMIOScheduler getIOScheduler() {
+ return lsmHarness.getIOScheduler();
+ }
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 2528d3d..871768c 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -43,8 +43,10 @@
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
@@ -69,11 +71,12 @@
ILSMFileManager fileManager, RTreeFactory diskRTreeFactory, BTreeFactory diskBTreeFactory,
IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] rtreeCmpFactories,
IBinaryComparatorFactory[] btreeCmpFactories, ILinearizeComparatorFactory linearizer,
- int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
+ int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMFlushController flushController,
+ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOScheduler ioScheduler) {
super(memBufferCache, memFreePageManager, rtreeInteriorFrameFactory, rtreeLeafFrameFactory,
btreeInteriorFrameFactory, btreeLeafFrameFactory, fileManager, diskRTreeFactory, diskFileMapProvider,
new LSMRTreeComponentFinalizer(diskFileMapProvider), fieldCount, rtreeCmpFactories, btreeCmpFactories,
- linearizer, comparatorFields, linearizerArray, flushPolicy, mergePolicy);
+ linearizer, comparatorFields, linearizerArray, flushController, mergePolicy, opTracker, ioScheduler);
this.diskBTreeFactory = diskBTreeFactory;
}
@@ -159,7 +162,8 @@
LSMRTreeCursorInitialState initialState = new LSMRTreeCursorInitialState(numTrees, rtreeLeafFrameFactory,
rtreeInteriorFrameFactory, btreeLeafFrameFactory, ctx.getBTreeMultiComparator(), rTreeAccessors,
- bTreeAccessors, searcherRefCount, includeMemComponent, lsmHarness, comparatorFields, linearizerArray, ctx.searchCallback);
+ bTreeAccessors, searcherRefCount, includeMemComponent, lsmHarness, comparatorFields, linearizerArray,
+ ctx.searchCallback);
cursor.open(initialState, pred);
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index f83ac35..eb79c3a 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -43,8 +43,10 @@
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
@@ -71,11 +73,12 @@
ILSMFileManager fileManager, RTreeFactory diskRTreeFactory, RTreeFactory bulkLoadRTreeFactory,
IFileMapProvider diskFileMapProvider, int fieldCount, IBinaryComparatorFactory[] rtreeCmpFactories,
IBinaryComparatorFactory[] btreeCmpFactories, ILinearizeComparatorFactory linearizer,
- int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy) {
+ int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMFlushController flushController,
+ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOScheduler ioScheduler) {
super(memBufferCache, memFreePageManager, rtreeInteriorFrameFactory, rtreeLeafFrameFactory,
btreeInteriorFrameFactory, btreeLeafFrameFactory, fileManager, diskRTreeFactory, diskFileMapProvider,
new TreeIndexComponentFinalizer(diskFileMapProvider), fieldCount, rtreeCmpFactories, btreeCmpFactories,
- linearizer, comparatorFields, linearizerArray, flushPolicy, mergePolicy);
+ linearizer, comparatorFields, linearizerArray, flushController, mergePolicy, opTracker, ioScheduler);
this.bulkLoadRTreeFactory = bulkLoadRTreeFactory;
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
index bf20bbc..f4680f9 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/utils/LSMRTreeUtils.java
@@ -30,8 +30,10 @@
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.freepage.LinkedListFreePageManagerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeFileManager;
@@ -56,8 +58,8 @@
IIOManager ioManager, String onDiskDir, IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider,
ITypeTraits[] typeTraits, IBinaryComparatorFactory[] rtreeCmpFactories,
IBinaryComparatorFactory[] btreeCmpFactories, IPrimitiveValueProviderFactory[] valueProviderFactories,
- RTreePolicyType rtreePolicyType, ILSMFlushPolicy flushPolicy,
- ILSMMergePolicy mergePolicy) throws TreeIndexException {
+ RTreePolicyType rtreePolicyType, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+ ILSMOperationTracker opTracker, ILSMIOScheduler ioScheduler) throws TreeIndexException {
LSMTypeAwareTupleWriterFactory rtreeTupleWriterFactory = new LSMTypeAwareTupleWriterFactory(typeTraits, false);
LSMTypeAwareTupleWriterFactory btreeTupleWriterFactory = new LSMTypeAwareTupleWriterFactory(typeTraits, true);
@@ -76,8 +78,8 @@
RTreeFactory diskRTreeFactory = new RTreeFactory(diskBufferCache, freePageManagerFactory, rtreeCmpFactories,
typeTraits.length, rtreeInteriorFrameFactory, rtreeLeafFrameFactory);
// TODO: Do we need another operation callback here?
- BTreeFactory diskBTreeFactory = new BTreeFactory(diskBufferCache, freePageManagerFactory,
- btreeCmpFactories, typeTraits.length, btreeInteriorFrameFactory, btreeLeafFrameFactory);
+ BTreeFactory diskBTreeFactory = new BTreeFactory(diskBufferCache, freePageManagerFactory, btreeCmpFactories,
+ typeTraits.length, btreeInteriorFrameFactory, btreeLeafFrameFactory);
ILinearizeComparatorFactory linearizer = proposeBestLinearizer(typeTraits, rtreeCmpFactories.length);
int[] comparatorFields = { 0 };
@@ -87,7 +89,8 @@
LSMRTree lsmTree = new LSMRTree(memBufferCache, memFreePageManager, rtreeInteriorFrameFactory,
rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory, fileNameManager,
diskRTreeFactory, diskBTreeFactory, diskFileMapProvider, typeTraits.length, rtreeCmpFactories,
- btreeCmpFactories, linearizer, comparatorFields, linearizerArray, flushPolicy, mergePolicy);
+ btreeCmpFactories, linearizer, comparatorFields, linearizerArray, flushController, mergePolicy, opTracker,
+ ioScheduler);
return lsmTree;
}
@@ -96,8 +99,8 @@
IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] rtreeCmpFactories, IBinaryComparatorFactory[] btreeCmpFactories,
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
- ILSMFlushPolicy flushPolicy, ILSMMergePolicy mergePolicy)
- throws TreeIndexException {
+ ILSMFlushController flushPolicy, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+ ILSMIOScheduler ioScheduler) throws TreeIndexException {
LSMRTreeTupleWriterFactory rtreeTupleWriterFactory = new LSMRTreeTupleWriterFactory(typeTraits, false);
LSMRTreeTupleWriterFactory btreeTupleWriterFactory = new LSMRTreeTupleWriterFactory(typeTraits, true);
@@ -135,7 +138,8 @@
LSMRTreeWithAntiMatterTuples lsmTree = new LSMRTreeWithAntiMatterTuples(memBufferCache, memFreePageManager,
rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory, btreeLeafFrameFactory,
fileNameManager, diskRTreeFactory, bulkLoadRTreeFactory, diskFileMapProvider, typeTraits.length,
- rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields, linearizerArray, flushPolicy, mergePolicy);
+ rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields, linearizerArray, flushPolicy,
+ mergePolicy, opTracker, ioScheduler);
return lsmTree;
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeBulkLoadTest.java
index 3615241..acce282 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeBulkLoadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeBulkLoadTest.java
@@ -28,7 +28,6 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
@SuppressWarnings("rawtypes")
public class LSMBTreeBulkLoadTest extends OrderedIndexBulkLoadTest {
@@ -54,7 +53,9 @@
BTreeLeafFrameType leafType) throws Exception {
return LSMBTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
- harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(), NoMergePolicy.INSTANCE);
+ harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(),
+ harness.getFlushController(), harness.getMergePolicy(), harness.getOperationTracker(),
+ harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeDeleteTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeDeleteTest.java
index f06f563..771696b 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeDeleteTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeDeleteTest.java
@@ -28,7 +28,6 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
@SuppressWarnings("rawtypes")
public class LSMBTreeDeleteTest extends OrderedIndexDeleteTest {
@@ -54,7 +53,9 @@
BTreeLeafFrameType leafType) throws Exception {
return LSMBTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
- harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(), NoMergePolicy.INSTANCE);
+ harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(),
+ harness.getFlushController(), harness.getMergePolicy(), harness.getOperationTracker(),
+ harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
index 8ebe248..a6544e3 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
@@ -27,8 +27,6 @@
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
public class LSMBTreeExamplesTest extends OrderedIndexExamplesTest {
private final LSMBTreeTestHarness harness = new LSMBTreeTestHarness();
@@ -38,8 +36,8 @@
throws TreeIndexException {
return LSMBTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
- harness.getDiskFileMapProvider(), typeTraits, cmpFactories, new ImmediateFlushPolicy(harness.getIOScheduler()),
- NoMergePolicy.INSTANCE);
+ harness.getDiskFileMapProvider(), typeTraits, cmpFactories, harness.getFlushController(),
+ harness.getMergePolicy(), harness.getOperationTracker(), harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeInsertTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeInsertTest.java
index f697b8e..15fbfa1 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeInsertTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeInsertTest.java
@@ -28,7 +28,6 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
@SuppressWarnings("rawtypes")
public class LSMBTreeInsertTest extends OrderedIndexInsertTest {
@@ -54,7 +53,9 @@
BTreeLeafFrameType leafType) throws Exception {
return LSMBTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
- harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(), NoMergePolicy.INSTANCE);
+ harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(),
+ harness.getFlushController(), harness.getMergePolicy(), harness.getOperationTracker(),
+ harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTest.java
index 0b0cb2f..d2d3bd9 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTest.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
@SuppressWarnings("rawtypes")
public class LSMBTreeMergeTest extends LSMBTreeMergeTestDriver {
@@ -53,7 +52,9 @@
BTreeLeafFrameType leafType) throws Exception {
return LSMBTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
- harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(), NoMergePolicy.INSTANCE);
+ harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(),
+ harness.getFlushController(), harness.getMergePolicy(), harness.getOperationTracker(),
+ harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMultiBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMultiBulkLoadTest.java
index 1237653..9219676 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMultiBulkLoadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMultiBulkLoadTest.java
@@ -29,7 +29,6 @@
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
@SuppressWarnings("rawtypes")
public class LSMBTreeMultiBulkLoadTest extends OrderedIndexBulkLoadTest {
@@ -55,7 +54,9 @@
BTreeLeafFrameType leafType) throws Exception {
return LSMBTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
- harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(), NoMergePolicy.INSTANCE);
+ harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(),
+ harness.getFlushController(), harness.getMergePolicy(), harness.getOperationTracker(),
+ harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeUpdateTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeUpdateTest.java
index 513c1d1..bdefe39 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeUpdateTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeUpdateTest.java
@@ -28,7 +28,6 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
@SuppressWarnings("rawtypes")
public class LSMBTreeUpdateTest extends OrderedIndexUpdateTest {
@@ -54,7 +53,9 @@
BTreeLeafFrameType leafType) throws Exception {
return LSMBTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
- harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(), NoMergePolicy.INSTANCE);
+ harness.getDiskFileMapProvider(), fieldSerdes, numKeys, harness.getFileId(),
+ harness.getFlushController(), harness.getMergePolicy(), harness.getOperationTracker(),
+ harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
index 81fa29c..394ffe9 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
@@ -29,8 +29,6 @@
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
public class LSMBTreeMultiThreadTest extends OrderedIndexMultiThreadTest {
@@ -53,8 +51,8 @@
throws TreeIndexException {
return LSMBTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
- harness.getDiskFileMapProvider(), typeTraits, cmpFactories, new ImmediateFlushPolicy(harness.getIOScheduler()),
- NoMergePolicy.INSTANCE);
+ harness.getDiskFileMapProvider(), typeTraits, cmpFactories, harness.getFlushController(),
+ harness.getMergePolicy(), harness.getOperationTracker(), harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 38c52ed..62d9115 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -32,11 +32,13 @@
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTracker;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -60,6 +62,7 @@
protected final int numBatches;
protected final LSMBTree lsmtree;
+ protected final ILSMIOScheduler ioScheduler;
protected IBufferCache memBufferCache;
private final int onDiskPageSize;
private final int onDiskNumPages;
@@ -83,10 +86,10 @@
inMemNumPages);
InMemoryFreePageManager memFreePageManager = new InMemoryFreePageManager(inMemNumPages,
new LIFOMetaDataFrameFactory());
-
+ this.ioScheduler = ImmediateScheduler.INSTANCE;
lsmtree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, onDiskDir, bufferCache,
- fmp, typeTraits, cmpFactories, new ImmediateFlushPolicy(SequentialScheduler.INSTANCE),
- NoMergePolicy.INSTANCE);
+ fmp, typeTraits, cmpFactories, new FlushController(), NoMergePolicy.INSTANCE,
+ new RefCountingOperationTracker(), ioScheduler);
}
@Override
@@ -124,6 +127,7 @@
@Override
public void deinit() throws Exception {
+ ioScheduler.shutdown();
bufferCache.closeFile(lsmtreeFileId);
bufferCache.close();
memBufferCache.closeFile(lsmtreeFileId);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
index fcc6245..8d5c94d 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
@@ -26,11 +26,12 @@
import edu.uci.ics.hyracks.storage.am.common.CheckTuple;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -64,12 +65,13 @@
public static LSMBTreeTestContext create(InMemoryBufferCache memBufferCache,
InMemoryFreePageManager memFreePageManager, IOManager ioManager, String onDiskDir,
IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ISerializerDeserializer[] fieldSerdes,
- int numKeyFields, int fileId, ILSMMergePolicy mergePolicy) throws Exception {
+ int numKeyFields, int fileId, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+ ILSMOperationTracker opTracker, ILSMIOScheduler ioScheduler) throws Exception {
ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes);
IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes, numKeyFields);
- LSMBTree lsmTree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager,
- ioManager, onDiskDir, diskBufferCache, diskFileMapProvider, typeTraits, cmpFactories,
- new ImmediateFlushPolicy(SequentialScheduler.INSTANCE), mergePolicy);
+ LSMBTree lsmTree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, onDiskDir,
+ diskBufferCache, diskFileMapProvider, typeTraits, cmpFactories, flushController, mergePolicy,
+ opTracker, ioScheduler);
lsmTree.create(fileId);
lsmTree.open(fileId);
LSMBTreeTestContext testCtx = new LSMBTreeTestContext(fieldSerdes, lsmTree);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
index d2f25bc..b05ded0 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
@@ -30,10 +30,16 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTracker;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -62,6 +68,9 @@
protected InMemoryFreePageManager memFreePageManager;
protected IHyracksTaskContext ctx;
protected ILSMIOScheduler ioScheduler;
+ protected ILSMFlushController flushController;
+ protected ILSMMergePolicy mergePolicy;
+ protected ILSMOperationTracker opTracker;
protected final Random rnd = new Random();
protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
@@ -69,13 +78,16 @@
protected String onDiskDir;
public LSMBTreeTestHarness() {
- this.diskPageSize = AccessMethodTestsConfig.LSM_BTREE_DISK_PAGE_SIZE;
- this.diskNumPages = AccessMethodTestsConfig.LSM_BTREE_DISK_NUM_PAGES;
- this.diskMaxOpenFiles = AccessMethodTestsConfig.LSM_BTREE_DISK_MAX_OPEN_FILES;
- this.memPageSize = AccessMethodTestsConfig.LSM_BTREE_MEM_PAGE_SIZE;
- this.memNumPages = AccessMethodTestsConfig.LSM_BTREE_MEM_NUM_PAGES;
- this.hyracksFrameSize = AccessMethodTestsConfig.LSM_BTREE_HYRACKS_FRAME_SIZE;
- this.ioScheduler = SequentialScheduler.INSTANCE;
+ this.diskPageSize = AccessMethodTestsConfig.LSM_BTREE_DISK_PAGE_SIZE;
+ this.diskNumPages = AccessMethodTestsConfig.LSM_BTREE_DISK_NUM_PAGES;
+ this.diskMaxOpenFiles = AccessMethodTestsConfig.LSM_BTREE_DISK_MAX_OPEN_FILES;
+ this.memPageSize = AccessMethodTestsConfig.LSM_BTREE_MEM_PAGE_SIZE;
+ this.memNumPages = AccessMethodTestsConfig.LSM_BTREE_MEM_NUM_PAGES;
+ this.hyracksFrameSize = AccessMethodTestsConfig.LSM_BTREE_HYRACKS_FRAME_SIZE;
+ this.ioScheduler = ImmediateScheduler.INSTANCE;
+ this.mergePolicy = NoMergePolicy.INSTANCE;
+ this.flushController = new FlushController();
+ this.opTracker = new RefCountingOperationTracker();
}
public LSMBTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -86,7 +98,10 @@
this.memPageSize = memPageSize;
this.memNumPages = memNumPages;
this.hyracksFrameSize = hyracksFrameSize;
- this.ioScheduler = SequentialScheduler.INSTANCE;
+ this.ioScheduler = ImmediateScheduler.INSTANCE;
+ this.mergePolicy = NoMergePolicy.INSTANCE;
+ this.flushController = new FlushController();
+ this.opTracker = new RefCountingOperationTracker();
}
public void setUp() throws HyracksException {
@@ -102,6 +117,7 @@
}
public void tearDown() throws HyracksDataException {
+ ioScheduler.shutdown();
diskBufferCache.close();
for (IODeviceHandle dev : ioManager.getIODevices()) {
File dir = new File(dev.getPath(), onDiskDir);
@@ -184,4 +200,16 @@
public ILSMIOScheduler getIOScheduler() {
return ioScheduler;
}
+
+ public ILSMOperationTracker getOperationTracker() {
+ return opTracker;
+ }
+
+ public ILSMFlushController getFlushController() {
+ return flushController;
+ }
+
+ public ILSMMergePolicy getMergePolicy() {
+ return mergePolicy;
+ }
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeBulkLoadTest.java
index 288bccd..0ecd7da 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeBulkLoadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeBulkLoadTest.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeBulkLoadTest;
@@ -57,7 +56,8 @@
return LSMRTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories, numKeys, rtreePolicyType,
- harness.getFileId(), NoMergePolicy.INSTANCE);
+ harness.getFileId(), harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeDeleteTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeDeleteTest.java
index 9c31b48..4202fcd 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeDeleteTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeDeleteTest.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeDeleteTest;
@@ -53,7 +52,8 @@
return LSMRTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories, numKeys, rtreePolicyType,
- harness.getFileId(), NoMergePolicy.INSTANCE);
+ harness.getFileId(), harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
index 888e794..8cb9172 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
@@ -25,8 +25,6 @@
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeExamplesTest;
@@ -42,8 +40,8 @@
return LSMRTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
harness.getDiskFileMapProvider(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, rtreePolicyType, new ImmediateFlushPolicy(harness.getIOScheduler()),
- NoMergePolicy.INSTANCE);
+ valueProviderFactories, rtreePolicyType, harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeInsertTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeInsertTest.java
index 639a926..6b9fe6d 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeInsertTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeInsertTest.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeInsertTest;
@@ -53,7 +52,8 @@
return LSMRTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories, numKeys, rtreePolicyType,
- harness.getFileId(), NoMergePolicy.INSTANCE);
+ harness.getFileId(), harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java
index 8ddf58c..06fa2f9 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
@@ -52,7 +51,8 @@
return LSMRTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories, numKeys, rtreePolicyType,
- harness.getFileId(), NoMergePolicy.INSTANCE);
+ harness.getFileId(), harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMultiBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMultiBulkLoadTest.java
index e3dce3d..2d84e8f 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMultiBulkLoadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMultiBulkLoadTest.java
@@ -25,7 +25,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeBulkLoadTest;
@@ -57,7 +56,8 @@
return LSMRTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories, numKeys, rtreePolicyType,
- harness.getFileId(), NoMergePolicy.INSTANCE);
+ harness.getFileId(), harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesBulkLoadTest.java
index 2f23ccf..b1bc1a3 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesBulkLoadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesBulkLoadTest.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeWithAntiMatterTuplesTestContext;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeBulkLoadTest;
@@ -57,7 +56,8 @@
return LSMRTreeWithAntiMatterTuplesTestContext.create(harness.getMemBufferCache(),
harness.getMemFreePageManager(), harness.getIOManager(), harness.getOnDiskDir(),
harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories,
- numKeys, rtreePolicyType, harness.getFileId(), NoMergePolicy.INSTANCE);
+ numKeys, rtreePolicyType, harness.getFileId(), harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesDeleteTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesDeleteTest.java
index b830317..04b7ea6 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesDeleteTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesDeleteTest.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeWithAntiMatterTuplesTestContext;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeDeleteTest;
@@ -53,7 +52,8 @@
return LSMRTreeWithAntiMatterTuplesTestContext.create(harness.getMemBufferCache(),
harness.getMemFreePageManager(), harness.getIOManager(), harness.getOnDiskDir(),
harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories,
- numKeys, rtreePolicyType, harness.getFileId(), NoMergePolicy.INSTANCE);
+ numKeys, rtreePolicyType, harness.getFileId(), harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesExamplesTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesExamplesTest.java
index a972228..8c13a00 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesExamplesTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesExamplesTest.java
@@ -25,8 +25,6 @@
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeExamplesTest;
@@ -42,8 +40,8 @@
return LSMRTreeUtils.createLSMTreeWithAntiMatterTuples(harness.getMemBufferCache(),
harness.getMemFreePageManager(), harness.getIOManager(), harness.getOnDiskDir(),
harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), typeTraits, rtreeCmpFactories,
- btreeCmpFactories, valueProviderFactories, rtreePolicyType,
- new ImmediateFlushPolicy(harness.getIOScheduler()), NoMergePolicy.INSTANCE);
+ btreeCmpFactories, valueProviderFactories, rtreePolicyType, harness.getFlushController(),
+ harness.getMergePolicy(), harness.getOperationTracker(), harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesInsertTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesInsertTest.java
index 8468ed3..e938109 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesInsertTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesInsertTest.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeWithAntiMatterTuplesTestContext;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeInsertTest;
@@ -53,7 +52,8 @@
return LSMRTreeWithAntiMatterTuplesTestContext.create(harness.getMemBufferCache(),
harness.getMemFreePageManager(), harness.getIOManager(), harness.getOnDiskDir(),
harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories,
- numKeys, rtreePolicyType, harness.getFileId(), NoMergePolicy.INSTANCE);
+ numKeys, rtreePolicyType, harness.getFileId(), harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMergeTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMergeTest.java
index 4d1ffd4..9350679 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMergeTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMergeTest.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeWithAntiMatterTuplesTestContext;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
@@ -52,7 +51,8 @@
return LSMRTreeWithAntiMatterTuplesTestContext.create(harness.getMemBufferCache(),
harness.getMemFreePageManager(), harness.getIOManager(), harness.getOnDiskDir(),
harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories,
- numKeys, rtreePolicyType, harness.getFileId(), NoMergePolicy.INSTANCE);
+ numKeys, rtreePolicyType, harness.getFileId(), harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMultiBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMultiBulkLoadTest.java
index cfb4bc7..20d1ef1 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMultiBulkLoadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesMultiBulkLoadTest.java
@@ -25,7 +25,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeWithAntiMatterTuplesTestContext;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeBulkLoadTest;
@@ -57,7 +56,8 @@
return LSMRTreeWithAntiMatterTuplesTestContext.create(harness.getMemBufferCache(),
harness.getMemFreePageManager(), harness.getIOManager(), harness.getOnDiskDir(),
harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, valueProviderFactories,
- numKeys, rtreePolicyType, harness.getFileId(), NoMergePolicy.INSTANCE);
+ numKeys, rtreePolicyType, harness.getFileId(), harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
index 710076a..1a8779d 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
@@ -27,8 +27,6 @@
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeMultiThreadTest;
@@ -57,8 +55,8 @@
return LSMRTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
harness.getDiskFileMapProvider(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, rtreePolicyType, new ImmediateFlushPolicy(harness.getIOScheduler()),
- NoMergePolicy.INSTANCE);
+ valueProviderFactories, rtreePolicyType, harness.getFlushController(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler());
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesMultiThreadTest.java
index f0211d8..8616946 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesMultiThreadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesMultiThreadTest.java
@@ -27,8 +27,6 @@
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeMultiThreadTest;
@@ -57,8 +55,8 @@
return LSMRTreeUtils.createLSMTreeWithAntiMatterTuples(harness.getMemBufferCache(),
harness.getMemFreePageManager(), harness.getIOManager(), harness.getOnDiskDir(),
harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), typeTraits, rtreeCmpFactories,
- btreeCmpFactories, valueProviderFactories, rtreePolicyType, new ImmediateFlushPolicy(harness.getIOScheduler()),
- NoMergePolicy.INSTANCE);
+ btreeCmpFactories, valueProviderFactories, rtreePolicyType, harness.getFlushController(),
+ harness.getMergePolicy(), harness.getOperationTracker(), harness.getIOScheduler());
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
index fbedc6a..2bbff9e 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
@@ -24,11 +24,12 @@
import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
@@ -69,7 +70,8 @@
InMemoryFreePageManager memFreePageManager, IOManager ioManager, String onDiskDir,
IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ISerializerDeserializer[] fieldSerdes,
IPrimitiveValueProviderFactory[] valueProviderFactories, int numKeyFields, RTreePolicyType rtreePolicyType,
- int fileId, ILSMMergePolicy mergePolicy) throws Exception {
+ int fileId, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+ ILSMOperationTracker opTracker, ILSMIOScheduler ioScheduler) throws Exception {
ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes);
IBinaryComparatorFactory[] rtreeCmpFactories = SerdeUtils
.serdesToComparatorFactories(fieldSerdes, numKeyFields);
@@ -77,8 +79,7 @@
fieldSerdes.length);
LSMRTree lsmTree = LSMRTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, onDiskDir,
diskBufferCache, diskFileMapProvider, typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, rtreePolicyType, new ImmediateFlushPolicy(SequentialScheduler.INSTANCE),
- mergePolicy);
+ valueProviderFactories, rtreePolicyType, flushController, mergePolicy, opTracker, ioScheduler);
lsmTree.create(fileId);
lsmTree.open(fileId);
LSMRTreeTestContext testCtx = new LSMRTreeTestContext(fieldSerdes, lsmTree);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
index 2c4432d..405f791 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
@@ -29,10 +29,16 @@
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeInMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeInMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -61,6 +67,9 @@
protected LSMRTreeInMemoryFreePageManager memFreePageManager;
protected IHyracksTaskContext ctx;
protected ILSMIOScheduler ioScheduler;
+ protected ILSMFlushController flushController;
+ protected ILSMMergePolicy mergePolicy;
+ protected ILSMOperationTracker opTracker;
protected final Random rnd = new Random();
protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
@@ -74,7 +83,10 @@
this.memPageSize = AccessMethodTestsConfig.LSM_RTREE_MEM_PAGE_SIZE;
this.memNumPages = AccessMethodTestsConfig.LSM_RTREE_MEM_NUM_PAGES;
this.hyracksFrameSize = AccessMethodTestsConfig.LSM_RTREE_HYRACKS_FRAME_SIZE;
- this.ioScheduler = SequentialScheduler.INSTANCE;
+ this.ioScheduler = ImmediateScheduler.INSTANCE;
+ this.mergePolicy = NoMergePolicy.INSTANCE;
+ this.flushController = new FlushController();
+ this.opTracker = new RefCountingOperationTracker();
}
public LSMRTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -85,7 +97,10 @@
this.memPageSize = memPageSize;
this.memNumPages = memNumPages;
this.hyracksFrameSize = hyracksFrameSize;
- this.ioScheduler = SequentialScheduler.INSTANCE;
+ this.ioScheduler = ImmediateScheduler.INSTANCE;
+ this.mergePolicy = NoMergePolicy.INSTANCE;
+ this.flushController = new FlushController();
+ this.opTracker = new RefCountingOperationTracker();
}
public void setUp() throws HyracksException {
@@ -101,6 +116,7 @@
}
public void tearDown() throws HyracksDataException {
+ ioScheduler.shutdown();
diskBufferCache.close();
for (IODeviceHandle dev : ioManager.getIODevices()) {
File dir = new File(dev.getPath(), onDiskDir);
@@ -183,4 +199,16 @@
public ILSMIOScheduler getIOScheduler() {
return ioScheduler;
}
+
+ public ILSMOperationTracker getOperationTracker() {
+ return opTracker;
+ }
+
+ public ILSMFlushController getFlushController() {
+ return flushController;
+ }
+
+ public ILSMMergePolicy getMergePolicy() {
+ return mergePolicy;
+ }
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeWithAntiMatterTuplesTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeWithAntiMatterTuplesTestContext.java
index 410adca..4b2e010 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeWithAntiMatterTuplesTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeWithAntiMatterTuplesTestContext.java
@@ -24,11 +24,12 @@
import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuples;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
@@ -69,7 +70,8 @@
InMemoryFreePageManager memFreePageManager, IOManager ioManager, String onDiskDir,
IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ISerializerDeserializer[] fieldSerdes,
IPrimitiveValueProviderFactory[] valueProviderFactories, int numKeyFields, RTreePolicyType rtreePolicyType,
- int fileId, ILSMMergePolicy mergePolicy) throws Exception {
+ int fileId, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+ ILSMOperationTracker opTracker, ILSMIOScheduler ioScheduler) throws Exception {
ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes);
IBinaryComparatorFactory[] rtreeCmpFactories = SerdeUtils
.serdesToComparatorFactories(fieldSerdes, numKeyFields);
@@ -77,8 +79,8 @@
fieldSerdes.length);
LSMRTreeWithAntiMatterTuples lsmTree = LSMRTreeUtils.createLSMTreeWithAntiMatterTuples(memBufferCache,
memFreePageManager, ioManager, onDiskDir, diskBufferCache, diskFileMapProvider, typeTraits,
- rtreeCmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType, new ImmediateFlushPolicy(SequentialScheduler.INSTANCE),
- mergePolicy);
+ rtreeCmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType, flushController,
+ mergePolicy, opTracker, ioScheduler);
lsmTree.create(fileId);
lsmTree.open(fileId);
LSMRTreeWithAntiMatterTuplesTestContext testCtx = new LSMRTreeWithAntiMatterTuplesTestContext(fieldSerdes,