separated IO scheduling from flush and merge policies
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1569 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index adacb2b..1687af8 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -19,7 +19,8 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialSchedulerProvider;
import edu.uci.ics.hyracks.tests.am.common.LSMTreeOperatorTestHelper;
public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
@@ -31,8 +32,9 @@
}
public IIndexDataflowHelperFactory createDataFlowHelperFactory() {
- return new LSMBTreeDataflowHelperFactory(new SequentialFlushPolicyProvider(), new ConstantMergePolicyProvider(
- MERGE_THRESHOLD));
+ return new LSMBTreeDataflowHelperFactory(
+ new ImmediateFlushPolicyProvider(SequentialSchedulerProvider.INSTANCE),
+ new ConstantMergePolicyProvider(SequentialSchedulerProvider.INSTANCE, MERGE_THRESHOLD));
}
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
index b0e220c..daa3381 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
@@ -20,7 +20,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.tests.am.common.LSMTreeOperatorTestHelper;
@@ -35,7 +36,8 @@
public IIndexDataflowHelperFactory createDataFlowHelperFactory(
IPrimitiveValueProviderFactory[] valueProviderFactories, IBinaryComparatorFactory[] btreeComparatorFactories) {
return new LSMRTreeDataflowHelperFactory(valueProviderFactories, btreeComparatorFactories,
- new SequentialFlushPolicyProvider(), new ConstantMergePolicyProvider(MERGE_THRESHOLD));
+ new ImmediateFlushPolicyProvider(SequentialSchedulerProvider.INSTANCE),
+ new ConstantMergePolicyProvider(SequentialSchedulerProvider.INSTANCE, MERGE_THRESHOLD));
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
index 1ab189c..a977911 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFlushPolicy.java
@@ -2,5 +2,5 @@
public interface ILSMFlushPolicy {
- public void memoryComponentFull(ILSMIndex index);
+ public void memoryComponentExceededThreshold(ILSMIndex index);
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOScheduler.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOScheduler.java
new file mode 100644
index 0000000..866e208
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOScheduler.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+public interface ILSMIOScheduler {
+ public void scheduleFlush(ILSMIndex index);
+
+ public void scheduleMerge(ILSMIndex index);
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOSchedulerProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOSchedulerProvider.java
new file mode 100644
index 0000000..c1e8ecb
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOSchedulerProvider.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+public interface ILSMIOSchedulerProvider extends Serializable {
+ public ILSMIOScheduler getIOScheduler();
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index ee1b246..ae940e4 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -2,5 +2,5 @@
public interface ILSMMergePolicy {
- public void componentAdded(ILSMIndex index, int totalNumDiskComponents, boolean mergeInProgress);
+ public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents);
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index e5f92a9..cff8d9b 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -1,44 +1,24 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
public class ConstantMergePolicy implements ILSMMergePolicy {
- private final ExecutorService executor;
+ private final ILSMIOScheduler ioScheduler;
private final int threshold;
- public ConstantMergePolicy(int threshold) {
- this.executor = Executors.newCachedThreadPool();
+ public ConstantMergePolicy(ILSMIOScheduler ioScheduler, int threshold) {
+ this.ioScheduler = ioScheduler;
this.threshold = threshold;
}
@Override
- public void componentAdded(final ILSMIndex index, int totalNumDiskComponents, boolean mergeInProgress) {
- synchronized (index) {
- if (totalNumDiskComponents >= threshold && !mergeInProgress) {
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- ((ILSMIndexAccessor) index.createAccessor()).merge();
- } catch (HyracksDataException e) {
- e.printStackTrace();
- } catch (IndexException e) {
- e.printStackTrace();
- }
- }
- });
-
- }
+ public void diskComponentAdded(final ILSMIndex index, int totalNumDiskComponents) {
+ if (totalNumDiskComponents >= threshold) {
+ ioScheduler.scheduleMerge(index);
}
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
index 0d18077..c68b425 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
@@ -1,5 +1,6 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
@@ -7,15 +8,18 @@
private static final long serialVersionUID = 1L;
+ private final ILSMIOSchedulerProvider schedulerProvider;
+
private final int threshold;
- public ConstantMergePolicyProvider(int threshold) {
+ public ConstantMergePolicyProvider(ILSMIOSchedulerProvider schedulerProvider, int threshold) {
+ this.schedulerProvider = schedulerProvider;
this.threshold = threshold;
}
@Override
public ILSMMergePolicy getMergePolicy() {
- return new ConstantMergePolicy(threshold);
+ return new ConstantMergePolicy(schedulerProvider.getIOScheduler(), threshold);
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicy.java
new file mode 100644
index 0000000..0fb6545
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicy.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class ImmediateFlushPolicy implements ILSMFlushPolicy {
+
+ private final ILSMIOScheduler ioScheduler;
+
+ public ImmediateFlushPolicy(ILSMIOScheduler ioScheduler) {
+ this.ioScheduler = ioScheduler;
+ }
+
+ @Override
+ public void memoryComponentExceededThreshold(final ILSMIndex index) {
+ // Schedule a flush immediately when the memory component is full
+ ioScheduler.scheduleFlush(index);
+ }
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicyProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicyProvider.java
new file mode 100644
index 0000000..8e73c09
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateFlushPolicyProvider.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOSchedulerProvider;
+
+public class ImmediateFlushPolicyProvider implements ILSMFlushPolicyProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ILSMIOSchedulerProvider schedulerProvider;
+
+ public ImmediateFlushPolicyProvider(ILSMIOSchedulerProvider schedulerProvider) {
+ this.schedulerProvider = schedulerProvider;
+ }
+
+ @Override
+ public ILSMFlushPolicy getFlushPolicy() {
+ return new ImmediateFlushPolicy(schedulerProvider.getIOScheduler());
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index b942d9e..7900b29 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -93,7 +93,7 @@
// Flush will only be handled by last exiting thread.
if (flushFlag && threadRefCount == 0) {
- flushPolicy.memoryComponentFull(lsmIndex);
+ flushPolicy.memoryComponentExceededThreshold(lsmIndex);
}
}
}
@@ -145,7 +145,7 @@
lsmIndex.resetInMemoryComponent();
synchronized (diskComponentsSync) {
lsmIndex.addFlushedComponent(newComponent);
- mergePolicy.componentAdded(lsmIndex, lsmIndex.getDiskComponents().size(), isMerging.get());
+ mergePolicy.diskComponentAdded(lsmIndex, lsmIndex.getDiskComponents().size());
}
// Unblock entering threads waiting for the flush
@@ -279,7 +279,7 @@
lsmIndex.getComponentFinalizer().finalize(index);
synchronized (diskComponentsSync) {
lsmIndex.addFlushedComponent(index);
- mergePolicy.componentAdded(lsmIndex, lsmIndex.getDiskComponents().size(), isMerging.get());
+ mergePolicy.diskComponentAdded(lsmIndex, lsmIndex.getDiskComponents().size());
}
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index d18436b..5d36c09 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -7,7 +7,7 @@
INSTANCE;
@Override
- public void componentAdded(ILSMIndex index, int totalNumDiskComponents, boolean mergeInProgress) {
+ public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents) {
// Do nothing
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicy.java
deleted file mode 100644
index c7cbd18..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicy.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-
-public enum SequentialFlushPolicy implements ILSMFlushPolicy {
- INSTANCE;
-
- private static final Logger LOGGER = Logger.getLogger(SequentialFlushPolicy.class.getName());
-
- private final ExecutorService executor;
-
- private SequentialFlushPolicy() {
- executor = Executors.newSingleThreadExecutor();
- }
-
- @Override
- public void memoryComponentFull(final ILSMIndex index) {
- executor.submit(new Runnable() {
- public void run() {
- try {
- LOGGER.info("flushing");
- ((ILSMIndexAccessor) index.createAccessor()).flush();
- LOGGER.info("finished flushing");
- } catch (HyracksDataException e) {
- LOGGER.info(e.getMessage());
- } catch (IndexException e) {
- LOGGER.info(e.getMessage());
- }
- LOGGER.info("Thread should end");
- }
- });
- }
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicyProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicyProvider.java
deleted file mode 100644
index e48b892..0000000
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialFlushPolicyProvider.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushPolicyProvider;
-
-public class SequentialFlushPolicyProvider implements ILSMFlushPolicyProvider {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public ILSMFlushPolicy getFlushPolicy() {
- return SequentialFlushPolicy.INSTANCE;
- }
-
-}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialScheduler.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialScheduler.java
new file mode 100644
index 0000000..8be892b
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialScheduler.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public enum SequentialScheduler implements ILSMIOScheduler {
+ INSTANCE;
+
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ @Override
+ public void scheduleFlush(final ILSMIndex index) {
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ ((ILSMIndexAccessor) index.createAccessor()).flush();
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ } catch (IndexException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ @Override
+ public void scheduleMerge(final ILSMIndex index) {
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ ((ILSMIndexAccessor) index.createAccessor()).merge();
+ } catch (LSMMergeInProgressException e) {
+ // Ignore!
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ } catch (IndexException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialSchedulerProvider.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialSchedulerProvider.java
new file mode 100644
index 0000000..83e2136
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SequentialSchedulerProvider.java
@@ -0,0 +1,14 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOSchedulerProvider;
+
+public enum SequentialSchedulerProvider implements ILSMIOSchedulerProvider {
+ INSTANCE;
+
+ @Override
+ public ILSMIOScheduler getIOScheduler() {
+ return SequentialScheduler.INSTANCE;
+ }
+
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
index 91232d1..0b2c645 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
@@ -27,8 +27,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
public class LSMBTreeExamplesTest extends OrderedIndexExamplesTest {
private final LSMBTreeTestHarness harness = new LSMBTreeTestHarness();
@@ -39,7 +39,7 @@
return LSMBTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemOpCallback(),
harness.getMemFreePageManager(), harness.getIOManager(), harness.getOnDiskDir(),
harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), typeTraits, cmpFactories,
- SequentialFlushPolicy.INSTANCE, NoMergePolicy.INSTANCE);
+ new ImmediateFlushPolicy(harness.getIOScheduler()), NoMergePolicy.INSTANCE);
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
index afe54a5..8dbfd75 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
@@ -29,8 +29,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
public class LSMBTreeMultiThreadTest extends OrderedIndexMultiThreadTest {
@@ -54,7 +54,7 @@
return LSMBTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemOpCallback(),
harness.getMemFreePageManager(), harness.getIOManager(), harness.getOnDiskDir(),
harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), typeTraits, cmpFactories,
- SequentialFlushPolicy.INSTANCE, NoMergePolicy.INSTANCE);
+ new ImmediateFlushPolicy(harness.getIOScheduler()), NoMergePolicy.INSTANCE);
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 94bc112..d244af4 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -34,8 +34,9 @@
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -84,8 +85,8 @@
new LIFOMetaDataFrameFactory());
lsmtree = LSMBTreeUtils.createLSMTree(memBufferCache, NoOpOperationCallback.INSTANCE, memFreePageManager,
- ioManager, onDiskDir, bufferCache, fmp, typeTraits, cmpFactories, SequentialFlushPolicy.INSTANCE,
- NoMergePolicy.INSTANCE);
+ ioManager, onDiskDir, bufferCache, fmp, typeTraits, cmpFactories, new ImmediateFlushPolicy(
+ SequentialScheduler.INSTANCE), NoMergePolicy.INSTANCE);
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
index 1ff997d..c59e223 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestContext.java
@@ -30,7 +30,8 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -69,7 +70,7 @@
IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes, numKeyFields);
LSMBTree lsmTree = LSMBTreeUtils.createLSMTree(memBufferCache, NoOpOperationCallback.INSTANCE,
memFreePageManager, ioManager, onDiskDir, diskBufferCache, diskFileMapProvider, typeTraits,
- cmpFactories, SequentialFlushPolicy.INSTANCE, mergePolicy);
+ cmpFactories, new ImmediateFlushPolicy(SequentialScheduler.INSTANCE), mergePolicy);
lsmTree.create(fileId);
lsmTree.open(fileId);
LSMBTreeTestContext testCtx = new LSMBTreeTestContext(fieldSerdes, lsmTree);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
index 63f3c75..7143448 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
@@ -31,8 +31,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -41,9 +43,9 @@
public class LSMBTreeTestHarness {
protected static final Logger LOGGER = Logger.getLogger(LSMBTreeTestHarness.class.getName());
-
+
public static final BTreeLeafFrameType[] LEAF_FRAMES_TO_TEST = new BTreeLeafFrameType[] { BTreeLeafFrameType.REGULAR_NSM };
-
+
private static final long RANDOM_SEED = 50;
private static final int DEFAULT_DISK_PAGE_SIZE = 256;
private static final int DEFAULT_DISK_NUM_PAGES = 1000;
@@ -51,48 +53,45 @@
private static final int DEFAULT_MEM_PAGE_SIZE = 256;
private static final int DEFAULT_MEM_NUM_PAGES = 100;
private static final int DEFAULT_HYRACKS_FRAME_SIZE = 128;
- private static final int DUMMY_FILE_ID = -1;
-
+ private static final int DUMMY_FILE_ID = -1;
+
protected final int diskPageSize;
protected final int diskNumPages;
protected final int diskMaxOpenFiles;
protected final int memPageSize;
protected final int memNumPages;
protected final int hyracksFrameSize;
-
+
protected IOManager ioManager;
protected IBufferCache diskBufferCache;
protected IFileMapProvider diskFileMapProvider;
protected InMemoryBufferCache memBufferCache;
protected InMemoryFreePageManager memFreePageManager;
protected IHyracksTaskContext ctx;
-
+ protected ILSMIOScheduler ioScheduler;
+
protected final Random rnd = new Random();
protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
- protected final static String sep = System.getProperty("file.separator");
+ protected final static String sep = System.getProperty("file.separator");
protected String onDiskDir;
-
+
public LSMBTreeTestHarness() {
- this.diskPageSize = DEFAULT_DISK_PAGE_SIZE;
- this.diskNumPages = DEFAULT_DISK_NUM_PAGES;
- this.diskMaxOpenFiles = DEFAULT_DISK_MAX_OPEN_FILES;
- this.memPageSize = DEFAULT_MEM_PAGE_SIZE;
- this.memNumPages = DEFAULT_MEM_NUM_PAGES;
- this.hyracksFrameSize = DEFAULT_HYRACKS_FRAME_SIZE;
+ this(DEFAULT_DISK_PAGE_SIZE, DEFAULT_DISK_NUM_PAGES, DEFAULT_DISK_MAX_OPEN_FILES, DEFAULT_MEM_PAGE_SIZE,
+ DEFAULT_MEM_NUM_PAGES, DEFAULT_HYRACKS_FRAME_SIZE);
}
-
- public LSMBTreeTestHarness(int diskPageSize, int diskNumPages,
- int diskMaxOpenFiles, int memPageSize, int memNumPages,
- int hyracksFrameSize) {
- this.diskPageSize = diskPageSize;
- this.diskNumPages = diskNumPages;
- this.diskMaxOpenFiles = diskMaxOpenFiles;
- this.memPageSize = memPageSize;
- this.memNumPages = memNumPages;
- this.hyracksFrameSize = hyracksFrameSize;
- }
-
- public void setUp() throws HyracksException {
+
+ public LSMBTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
+ int memNumPages, int hyracksFrameSize) {
+ this.diskPageSize = diskPageSize;
+ this.diskNumPages = diskNumPages;
+ this.diskMaxOpenFiles = diskMaxOpenFiles;
+ this.memPageSize = memPageSize;
+ this.memNumPages = memNumPages;
+ this.hyracksFrameSize = hyracksFrameSize;
+ this.ioScheduler = SequentialScheduler.INSTANCE;
+ }
+
+ public void setUp() throws HyracksException {
onDiskDir = "lsm_btree_" + simpleDateFormat.format(new Date()) + sep;
ctx = TestUtils.create(getHyracksFrameSize());
TestStorageManagerComponentHolder.init(diskPageSize, diskNumPages, diskMaxOpenFiles);
@@ -103,10 +102,10 @@
ioManager = TestStorageManagerComponentHolder.getIOManager();
rnd.setSeed(RANDOM_SEED);
}
-
+
public void tearDown() throws HyracksDataException {
diskBufferCache.close();
- for(IODeviceHandle dev : ioManager.getIODevices()) {
+ for (IODeviceHandle dev : ioManager.getIODevices()) {
File dir = new File(dev.getPath(), onDiskDir);
FilenameFilter filter = new FilenameFilter() {
public boolean accept(File dir, String name) {
@@ -123,68 +122,72 @@
dir.delete();
}
}
-
+
public int getDiskPageSize() {
return diskPageSize;
}
-
+
public int getDiskNumPages() {
return diskNumPages;
}
-
+
public int getDiskMaxOpenFiles() {
return diskMaxOpenFiles;
}
-
+
public int getMemPageSize() {
return memPageSize;
}
-
+
public int getMemNumPages() {
return memNumPages;
}
-
+
public int getHyracksFrameSize() {
return hyracksFrameSize;
}
-
+
public int getFileId() {
- return DUMMY_FILE_ID;
+ return DUMMY_FILE_ID;
}
-
+
public IOManager getIOManager() {
return ioManager;
}
-
+
public IBufferCache getDiskBufferCache() {
- return diskBufferCache;
+ return diskBufferCache;
}
-
+
public IFileMapProvider getDiskFileMapProvider() {
- return diskFileMapProvider;
+ return diskFileMapProvider;
}
-
+
public InMemoryBufferCache getMemBufferCache() {
- return memBufferCache;
+ return memBufferCache;
}
-
+
public InMemoryFreePageManager getMemFreePageManager() {
- return memFreePageManager;
+ return memFreePageManager;
}
-
+
public IHyracksTaskContext getHyracksTastContext() {
- return ctx;
+ return ctx;
}
-
+
public String getOnDiskDir() {
- return onDiskDir;
+ return onDiskDir;
}
-
+
public Random getRandom() {
return rnd;
}
-
+
public IOperationCallback getMemOpCallback() {
return NoOpOperationCallback.INSTANCE;
}
+
+ public ILSMIOScheduler getIOScheduler() {
+ return ioScheduler;
+ }
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
index 72703c8..fc80861 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeExamplesTest.java
@@ -25,8 +25,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeExamplesTest;
@@ -41,7 +41,7 @@
return LSMRTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
harness.getDiskFileMapProvider(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, SequentialFlushPolicy.INSTANCE, NoMergePolicy.INSTANCE);
+ valueProviderFactories, new ImmediateFlushPolicy(harness.getIOScheduler()), NoMergePolicy.INSTANCE);
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
index 033200c..b689cf5 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeMultiThreadTest.java
@@ -28,7 +28,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.util.LSMRTreeTestHarness;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeMultiThreadTest;
@@ -56,7 +56,7 @@
return LSMRTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(),
harness.getDiskFileMapProvider(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, SequentialFlushPolicy.INSTANCE, NoMergePolicy.INSTANCE);
+ valueProviderFactories, new ImmediateFlushPolicy(harness.getIOScheduler()), NoMergePolicy.INSTANCE);
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
index 3ba6206..bf40384 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
@@ -27,7 +27,8 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateFlushPolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
@@ -75,7 +76,7 @@
fieldSerdes.length);
LSMRTree lsmTree = LSMRTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, onDiskDir,
diskBufferCache, diskFileMapProvider, typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, SequentialFlushPolicy.INSTANCE, mergePolicy);
+ valueProviderFactories, new ImmediateFlushPolicy(SequentialScheduler.INSTANCE), mergePolicy);
lsmTree.create(fileId);
lsmTree.open(fileId);
LSMRTreeTestContext testCtx = new LSMRTreeTestContext(fieldSerdes, lsmTree);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
index bdde7ee..756c746 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestHarness.java
@@ -28,8 +28,10 @@
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SequentialScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeInMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeInMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -63,6 +65,7 @@
protected LSMRTreeInMemoryBufferCache memBufferCache;
protected LSMRTreeInMemoryFreePageManager memFreePageManager;
protected IHyracksTaskContext ctx;
+ protected ILSMIOScheduler ioScheduler;
protected final Random rnd = new Random();
protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
@@ -70,12 +73,8 @@
protected String onDiskDir;
public LSMRTreeTestHarness() {
- this.diskPageSize = DEFAULT_DISK_PAGE_SIZE;
- this.diskNumPages = DEFAULT_DISK_NUM_PAGES;
- this.diskMaxOpenFiles = DEFAULT_DISK_MAX_OPEN_FILES;
- this.memPageSize = DEFAULT_MEM_PAGE_SIZE;
- this.memNumPages = DEFAULT_MEM_NUM_PAGES;
- this.hyracksFrameSize = DEFAULT_HYRACKS_FRAME_SIZE;
+ this(DEFAULT_DISK_PAGE_SIZE, DEFAULT_DISK_NUM_PAGES, DEFAULT_DISK_MAX_OPEN_FILES, DEFAULT_MEM_PAGE_SIZE,
+ DEFAULT_MEM_NUM_PAGES, DEFAULT_HYRACKS_FRAME_SIZE);
}
public LSMRTreeTestHarness(int diskPageSize, int diskNumPages, int diskMaxOpenFiles, int memPageSize,
@@ -86,6 +85,7 @@
this.memPageSize = memPageSize;
this.memNumPages = memNumPages;
this.hyracksFrameSize = hyracksFrameSize;
+ this.ioScheduler = SequentialScheduler.INSTANCE;
}
public void setUp() throws HyracksException {
@@ -179,4 +179,8 @@
public Random getRandom() {
return rnd;
}
+
+ public ILSMIOScheduler getIOScheduler() {
+ return ioScheduler;
+ }
}