[ASTERIXDB-2541][STO] Introduce GreedyScheduler
- user model changes: yes.
Add new option: storage.io.scheduler (async/greedy)
- storage format changes: no.
- interface changes: yes.
Introduce IIndexCursorStats
Details:
- Introduce GreedyScheduler that always executes the merge
operation with the smallest number of remaining pages to minimize
the number of disk components
- Introduce IIndexCursorStats to collect the statistics of index scans.
This allows GreedyScheduler to know the remaning pages of merge
operations.
- Extend AbstractIoOperation so that GreedyScheduler can pause/resume
merge operations if needed.
Change-Id: I38fe394d1180d4e3f6796064c0e6c6630b6ad303
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3284
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index fc347c9..13ca95d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -93,6 +93,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
import org.apache.hyracks.storage.am.lsm.common.impls.ConcurrentMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
@@ -184,7 +185,7 @@
IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator,
storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages());
- lsmIOScheduler = new AsynchronousScheduler(getServiceContext().getThreadFactory(), HaltCallback.INSTANCE);
+ lsmIOScheduler = createIoScheduler(storageProperties);
metadataMergePolicyFactory = new ConcurrentMergePolicyFactory();
indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(ioManager);
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
@@ -552,4 +553,24 @@
public IConfigValidator getConfigValidator() {
return configValidator;
}
+
+ private ILSMIOOperationScheduler createIoScheduler(StorageProperties properties) {
+ String schedulerName = storageProperties.getIoScheduler();
+ ILSMIOOperationScheduler ioScheduler = null;
+ if (AsynchronousScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
+ ioScheduler = AsynchronousScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
+ HaltCallback.INSTANCE);
+ } else if (GreedyScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
+ ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
+ HaltCallback.INSTANCE);
+ } else {
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.log(Level.WARN,
+ "Unknown storage I/O scheduler: " + schedulerName + "; defaulting to greedy I/O scheduler.");
+ }
+ ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
+ HaltCallback.INSTANCE);
+ }
+ return ioScheduler;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/resources/cc.conf b/asterixdb/asterix-app/src/main/resources/cc.conf
index 36e925b..1b8e034 100644
--- a/asterixdb/asterix-app/src/main/resources/cc.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc.conf
@@ -38,6 +38,7 @@
storage.buffercache.pagesize=32KB
storage.buffercache.size=128MB
storage.memorycomponent.globalbudget=512MB
+storage.io.scheduler=greedy
[cc]
address = 127.0.0.1
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index d841b5a..a0d5ceb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -51,7 +51,8 @@
STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d),
STORAGE_MAX_ACTIVE_WRITABLE_DATASETS(UNSIGNED_INTEGER, 8),
STORAGE_COMPRESSION_BLOCK(STRING, "none"),
- STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(16, MEGABYTE));
+ STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(16, MEGABYTE)),
+ STORAGE_IO_SCHEDULER(STRING, "greedy");
private final IOptionType interpreter;
private final Object defaultValue;
@@ -96,6 +97,8 @@
return "The default compression scheme for the storage";
case STORAGE_DISK_FORCE_BYTES:
return "The number of bytes before each disk force (fsync)";
+ case STORAGE_IO_SCHEDULER:
+ return "The I/O scheduler for LSM flush and merge operations";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -191,6 +194,10 @@
return accessor.getString(Option.STORAGE_COMPRESSION_BLOCK);
}
+ public String getIoScheduler() {
+ return accessor.getString(Option.STORAGE_IO_SCHEDULER);
+ }
+
protected int getMetadataDatasets() {
return MetadataIndexImmutableProperties.METADATA_DATASETS_COUNT;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
index 9314f24..c34a671 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
@@ -26,6 +26,8 @@
// Hyracks task context
public static final String HYRACKS_TASK_CONTEXT = "HYRACKS_TASK_CONTEXT";
+ public static final String INDEX_CURSOR_STATS = "INDEX_CURSOR_STATS";
+
private HyracksConstants() {
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 744f25d..3a062af 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
@@ -57,10 +58,12 @@
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -816,7 +819,7 @@
@Override
public BTreeAccessor createAccessor(IIndexAccessParameters iap) {
- return new BTreeAccessor(this, iap.getModificationCallback(), iap.getSearchOperationCallback());
+ return new BTreeAccessor(this, iap);
}
// TODO: Class should be private. But currently we need to expose the
@@ -832,18 +835,19 @@
protected BTree btree;
protected BTreeOpContext ctx;
private boolean destroyed = false;
+ protected IIndexAccessParameters iap;
- public BTreeAccessor(BTree btree, IModificationOperationCallback modificationCalback,
- ISearchOperationCallback searchCallback) {
+ public BTreeAccessor(BTree btree, IIndexAccessParameters iap) {
this.btree = btree;
- this.ctx = btree.createOpContext(this, modificationCalback, searchCallback);
+ this.ctx = btree.createOpContext(this, iap.getModificationCallback(), iap.getSearchOperationCallback());
+ this.iap = iap;
}
- public void reset(BTree btree, IModificationOperationCallback modificationCallback,
- ISearchOperationCallback searchCallback) {
+ public void reset(BTree btree, IIndexAccessParameters iap) {
this.btree = btree;
- ctx.setCallbacks(modificationCallback, searchCallback);
+ ctx.setCallbacks(iap.getModificationCallback(), iap.getSearchOperationCallback());
ctx.reset();
+ this.iap = iap;
}
@Override
@@ -879,7 +883,8 @@
@Override
public BTreeRangeSearchCursor createSearchCursor(boolean exclusive) {
IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) btree.getLeafFrameFactory().createFrame();
- return new BTreeRangeSearchCursor(leafFrame, exclusive);
+ return new BTreeRangeSearchCursor(leafFrame, exclusive, (IIndexCursorStats) iap.getParameters()
+ .getOrDefault(HyracksConstants.INDEX_CURSOR_STATS, NoOpIndexCursorStats.INSTANCE));
}
public BTreeRangeSearchCursor createPointCursor(boolean exclusive) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java
index bff1bcb..9ccd881 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java
@@ -32,9 +32,11 @@
import org.apache.hyracks.storage.common.EnforcedIndexCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.IIndexAccessor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -73,12 +75,19 @@
protected ITupleReference lowKey;
protected ITupleReference highKey;
+ protected final IIndexCursorStats stats;
+
public BTreeRangeSearchCursor(IBTreeLeafFrame frame, boolean exclusiveLatchNodes) {
+ this(frame, exclusiveLatchNodes, NoOpIndexCursorStats.INSTANCE);
+ }
+
+ public BTreeRangeSearchCursor(IBTreeLeafFrame frame, boolean exclusiveLatchNodes, IIndexCursorStats stats) {
this.frame = frame;
this.frameTuple = frame.createTupleReference();
this.exclusiveLatchNodes = exclusiveLatchNodes;
this.reusablePredicate = new RangePredicate();
this.reconciliationTuple = new ArrayTupleReference();
+ this.stats = stats;
}
@Override
@@ -300,6 +309,7 @@
} else {
nextPage.acquireReadLatch();
}
+ stats.getPageCounter().update(1);
return nextPage;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
index 523ed9b..6459471 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.btree.api.IBTreeFrame;
import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
@@ -33,10 +34,10 @@
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexCursor;
-import org.apache.hyracks.storage.common.IModificationOperationCallback;
-import org.apache.hyracks.storage.common.ISearchOperationCallback;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -195,14 +196,13 @@
@Override
public BTreeAccessor createAccessor(IIndexAccessParameters iap) {
- return new DiskBTreeAccessor(this, iap.getModificationCallback(), iap.getSearchOperationCallback());
+ return new DiskBTreeAccessor(this, iap);
}
public class DiskBTreeAccessor extends BTreeAccessor {
- public DiskBTreeAccessor(DiskBTree btree, IModificationOperationCallback modificationCalback,
- ISearchOperationCallback searchCallback) {
- super(btree, modificationCalback, searchCallback);
+ public DiskBTreeAccessor(DiskBTree btree, IIndexAccessParameters iap) {
+ super(btree, iap);
}
@Override
@@ -228,7 +228,8 @@
@Override
public DiskBTreeRangeSearchCursor createSearchCursor(boolean exclusive) {
IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) btree.getLeafFrameFactory().createFrame();
- return new DiskBTreeRangeSearchCursor(leafFrame, exclusive);
+ return new DiskBTreeRangeSearchCursor(leafFrame, exclusive, (IIndexCursorStats) iap.getParameters()
+ .getOrDefault(HyracksConstants.INDEX_CURSOR_STATS, NoOpIndexCursorStats.INSTANCE));
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java
index 0e82088..d26378b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTreeRangeSearchCursor.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -36,6 +37,10 @@
super(frame, exclusiveLatchNodes);
}
+ public DiskBTreeRangeSearchCursor(IBTreeLeafFrame frame, boolean exclusiveLatchNodes, IIndexCursorStats stats) {
+ super(frame, exclusiveLatchNodes, stats);
+ }
+
@Override
public boolean doHasNext() throws HyracksDataException {
int nextLeafPage;
@@ -95,6 +100,7 @@
@Override
protected ICachedPage acquirePage(int pageId) throws HyracksDataException {
+ stats.getPageCounter().update(1);
return bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/IndexAccessParameters.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/IndexAccessParameters.java
index 47b5a99..11d3cd5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/IndexAccessParameters.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/IndexAccessParameters.java
@@ -21,9 +21,12 @@
import java.util.HashMap;
import java.util.Map;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
public class IndexAccessParameters implements IIndexAccessParameters {
@@ -56,4 +59,15 @@
return paramMap;
}
+ public static IIndexAccessParameters createNoOpParams(IIndexCursorStats stats) {
+ if (stats == NoOpIndexCursorStats.INSTANCE) {
+ return NoOpIndexAccessParameters.INSTANCE;
+ } else {
+ IndexAccessParameters iap =
+ new IndexAccessParameters(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ iap.getParameters().put(HyracksConstants.INDEX_CURSOR_STATS, stats);
+ return iap;
+ }
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java
index 6a88071..fa7811c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java
@@ -47,5 +47,4 @@
public Map<String, Object> getParameters() {
return paramMap;
}
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index 0308ef1..e727dc1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -58,8 +58,10 @@
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.IndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
@@ -189,7 +191,8 @@
returnDeletedTuples = true;
}
}
- LSMBTreeRangeSearchCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples);
+ IIndexCursorStats stats = new IndexCursorStats();
+ LSMBTreeRangeSearchCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples, stats);
BTree lastBTree = ((LSMBTreeDiskComponent) mergingComponents.get(0)).getIndex();
BTree firstBTree = ((LSMBTreeDiskComponent) mergingComponents.get(mergingComponents.size() - 1)).getIndex();
FileReference firstFile = firstBTree.getFileReference();
@@ -197,7 +200,7 @@
LSMComponentFileReferences relMergeFileRefs =
fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName());
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getHarness(), opCtx, cursorFactory);
- LSMBTreeMergeOperation mergeOp = new LSMBTreeMergeOperation(accessor, cursor,
+ LSMBTreeMergeOperation mergeOp = new LSMBTreeMergeOperation(accessor, cursor, stats,
relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(),
ioOpCallback, fileManager.getBaseDir().getAbsolutePath());
ioOpCallback.scheduled(mergeOp);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index d207b94..9855571 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -63,8 +63,10 @@
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.IndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.util.trace.ITracer;
@@ -274,7 +276,8 @@
ILSMIndexOperationContext bctx = createOpContext(NoOpOperationCallback.INSTANCE, 0);
bctx.setOperation(IndexOperation.MERGE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
- LSMBTreeWithBuddySortedCursor cursor = new LSMBTreeWithBuddySortedCursor(bctx, buddyBTreeFields);
+ IIndexCursorStats stats = new IndexCursorStats();
+ LSMBTreeWithBuddySortedCursor cursor = new LSMBTreeWithBuddySortedCursor(bctx, buddyBTreeFields, stats);
LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getHarness(), bctx,
opCtx -> new LSMBTreeWithBuddySearchCursor(opCtx, buddyBTreeFields));
@@ -291,10 +294,10 @@
.get(secondDiskComponents.size() - 1);
}
- LSMBTreeWithBuddyMergeOperation mergeOp =
- new LSMBTreeWithBuddyMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(),
- relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(),
- ioOpCallback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples);
+ LSMBTreeWithBuddyMergeOperation mergeOp = new LSMBTreeWithBuddyMergeOperation(accessor, cursor, stats,
+ relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
+ relMergeFileRefs.getBloomFilterFileReference(), ioOpCallback,
+ fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples);
ioOpCallback.scheduled(mergeOp);
return mergeOp;
@@ -328,7 +331,7 @@
if (mergeOp.isKeepDeletedTuples()) {
// Keep the deleted tuples since the oldest disk component is not
// included in the merge operation
- LSMBuddyBTreeMergeCursor buddyBtreeCursor = new LSMBuddyBTreeMergeCursor(opCtx);
+ LSMBuddyBTreeMergeCursor buddyBtreeCursor = new LSMBuddyBTreeMergeCursor(opCtx, mergeOp.getCursorStats());
search(opCtx, buddyBtreeCursor, btreeSearchPred);
long numElements = 0L;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index c041b06..c3d1416 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -68,7 +68,9 @@
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.IndexCursorStats;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
@@ -480,8 +482,9 @@
if (mergingComponents.get(mergingComponents.size() - 1) != diskComponents.get(diskComponents.size() - 1)) {
returnDeletedTuples = true;
}
- LSMBTreeRangeSearchCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples);
- return new LSMBTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(),
+ IIndexCursorStats stats = new IndexCursorStats();
+ LSMBTreeRangeSearchCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples, stats);
+ return new LSMBTreeMergeOperation(accessor, cursor, stats, mergeFileRefs.getInsertIndexFileReference(),
mergeFileRefs.getBloomFilterFileReference(), callback, getIndexIdentifier());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
index ff0bfd2..667a8dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
@@ -40,6 +40,7 @@
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMBTreeDiskComponentScanCursor extends LSMIndexSearchCursor {
@@ -59,7 +60,7 @@
private IntegerPointable cursorIndexPointable;
public LSMBTreeDiskComponentScanCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx, true);
+ super(opCtx, true, NoOpIndexCursorStats.INSTANCE);
this.outputTuple = new ArrayTupleReference();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index 3aef4c2..fca4c70 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -25,14 +25,16 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
public class LSMBTreeMergeOperation extends MergeOperation {
private final FileReference bloomFilterMergeTarget;
- public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, FileReference target,
- FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, target, callback, indexIdentifier, cursor);
+ public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, IIndexCursorStats stats,
+ FileReference target, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback,
+ String indexIdentifier) {
+ super(accessor, target, callback, indexIdentifier, cursor, stats);
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
}
@@ -44,4 +46,5 @@
public LSMComponentFileReferences getComponentFiles() {
return new LSMComponentFileReferences(target, null, bloomFilterMergeTarget);
}
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
index 3520e3a..c376262 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
@@ -31,7 +31,6 @@
import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
@@ -200,7 +199,7 @@
btreeCursors[i] = btreeAccessors[i].createPointCursor(false);
} else {
// re-use
- btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ btreeAccessors[i].reset(btree, NoOpIndexAccessParameters.INSTANCE);
btreeCursors[i].close();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index fb0b9ac0..46d279f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -30,8 +30,6 @@
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
-import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
@@ -40,8 +38,10 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
@@ -56,11 +56,12 @@
private int tupleFromMemoryComponentCount = 0;
public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
- this(opCtx, false);
+ this(opCtx, false, NoOpIndexCursorStats.INSTANCE);
}
- public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
- super(opCtx, returnDeletedTuples);
+ public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples,
+ IIndexCursorStats stats) {
+ super(opCtx, returnDeletedTuples, stats);
this.copyTuple = new ArrayTupleReference();
this.reusablePred = new RangePredicate(null, null, true, true, null, null);
}
@@ -233,7 +234,7 @@
switchComponentTupleBuilders[i].getByteArray());
reusablePred.setLowKey(copyTuple, true);
rangeCursors[i].close();
- btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ btreeAccessors[i].reset(btree, iap);
btreeAccessors[i].search(rangeCursors[i], reusablePred);
pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
}
@@ -370,11 +371,11 @@
}
btree = (BTree) component.getIndex();
if (btreeAccessors[i] == null || destroyIncompatible(component, i)) {
- btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ btreeAccessors[i] = btree.createAccessor(iap);
rangeCursors[i] = btreeAccessors[i].createSearchCursor(false);
} else {
// re-use
- btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ btreeAccessors[i].reset(btree, iap);
rangeCursors[i].close();
}
isMemoryComponent[i] = component.getType() == LSMComponentType.MEMORY;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyAbstractCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyAbstractCursor.java
index 08e7b91..e7b8bbd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyAbstractCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyAbstractCursor.java
@@ -30,14 +30,15 @@
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
-import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.EnforcedIndexCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
@@ -59,14 +60,16 @@
protected ILSMHarness lsmHarness;
protected boolean foundNext;
protected final ILSMIndexOperationContext opCtx;
+ protected final IIndexAccessParameters iap;
protected final long[] hashes = BloomFilter.createHashArray();
protected List<ILSMComponent> operationalComponents;
- public LSMBTreeWithBuddyAbstractCursor(ILSMIndexOperationContext opCtx) {
+ public LSMBTreeWithBuddyAbstractCursor(ILSMIndexOperationContext opCtx, IIndexCursorStats stats) {
super();
this.opCtx = opCtx;
+ this.iap = IndexAccessParameters.createNoOpParams(stats);
buddyBtreeRangePredicate = new RangePredicate(null, null, true, true, null, null);
}
@@ -127,13 +130,12 @@
IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory().createFrame();
if (btreeAccessors[i] == null) {
btreeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
- btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- buddyBtreeAccessors[i] = buddyBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ btreeAccessors[i] = btree.createAccessor(iap);
+ buddyBtreeAccessors[i] = buddyBtree.createAccessor(iap);
} else {
btreeCursors[i].close();
- btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- buddyBtreeAccessors[i].reset(buddyBtree, NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
+ btreeAccessors[i].reset(btree, iap);
+ buddyBtreeAccessors[i].reset(buddyBtree, iap);
}
}
btreeRangePredicate = (RangePredicate) searchPred;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
index dd4fcf5..3876afa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
public class LSMBTreeWithBuddyMergeOperation extends MergeOperation {
@@ -31,10 +32,10 @@
private final FileReference bloomFilterMergeTarget;
private final boolean keepDeletedTuples;
- public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, FileReference target,
- FileReference buddyBtreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback,
- String indexIdentifier, boolean keepDeletedTuples) {
- super(accessor, target, callback, indexIdentifier, cursor);
+ public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, IIndexCursorStats stats,
+ FileReference target, FileReference buddyBtreeMergeTarget, FileReference bloomFilterMergeTarget,
+ ILSMIOOperationCallback callback, String indexIdentifier, boolean keepDeletedTuples) {
+ super(accessor, target, callback, indexIdentifier, cursor, stats);
this.buddyBtreeMergeTarget = buddyBtreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
this.keepDeletedTuples = keepDeletedTuples;
@@ -56,4 +57,5 @@
public LSMComponentFileReferences getComponentFiles() {
return new LSMComponentFileReferences(target, buddyBtreeMergeTarget, bloomFilterMergeTarget);
}
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySearchCursor.java
index 7aaa946..ac3c8ae7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySearchCursor.java
@@ -24,14 +24,21 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
public class LSMBTreeWithBuddySearchCursor extends LSMBTreeWithBuddyAbstractCursor {
private int currentCursor;
private final PermutingTupleReference buddyBTreeTuple;
public LSMBTreeWithBuddySearchCursor(ILSMIndexOperationContext opCtx, int[] buddyBTreeFields) {
- super(opCtx);
+ this(opCtx, buddyBTreeFields, NoOpIndexCursorStats.INSTANCE);
+ }
+
+ public LSMBTreeWithBuddySearchCursor(ILSMIndexOperationContext opCtx, int[] buddyBTreeFields,
+ IIndexCursorStats stats) {
+ super(opCtx, stats);
currentCursor = 0;
this.buddyBTreeTuple = new PermutingTupleReference(buddyBTreeFields);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
index 4825104..ce47589 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.util.IndexCursorUtils;
@@ -36,9 +37,9 @@
private int foundIn = -1;
private PermutingTupleReference buddyBtreeTuple;
- public LSMBTreeWithBuddySortedCursor(ILSMIndexOperationContext opCtx, int[] buddyBTreeFields)
- throws HyracksDataException {
- super(opCtx);
+ public LSMBTreeWithBuddySortedCursor(ILSMIndexOperationContext opCtx, int[] buddyBTreeFields,
+ IIndexCursorStats stats) throws HyracksDataException {
+ super(opCtx, stats);
this.buddyBtreeTuple = new PermutingTupleReference(buddyBTreeFields);
close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
index b35c5d1..eb4b29d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
@@ -19,25 +19,23 @@
package org.apache.hyracks.storage.am.lsm.btree.impls;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMBuddyBTreeMergeCursor extends LSMIndexSearchCursor {
- public LSMBuddyBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx, true);
+ public LSMBuddyBTreeMergeCursor(ILSMIndexOperationContext opCtx, IIndexCursorStats stats) {
+ super(opCtx, true, stats);
}
@Override
@@ -60,10 +58,9 @@
IIndexAccessor[] btreeAccessors = new ITreeIndexAccessor[numBTrees];
for (int i = 0; i < numBTrees; i++) {
ILSMComponent component = operationalComponents.get(i);
- IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getBuddyBTreeLeafFrameFactory().createFrame();
- rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
BTree buddyBtree = ((LSMBTreeWithBuddyDiskComponent) component).getBuddyIndex();
- btreeAccessors[i] = buddyBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ btreeAccessors[i] = buddyBtree.createAccessor(iap);
+ rangeCursors[i] = btreeAccessors[i].createSearchCursor(false);
}
IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 79463e5..8084c81 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -153,4 +153,27 @@
* Get parameters passed when calling this IO operation
*/
Map<String, Object> getParameters();
+
+ /**
+ *
+ * @return the estimated number of disk pages remaining for this IO operation
+ */
+ long getRemainingPages();
+
+ /**
+ * Resume this IO operation
+ */
+ void resume();
+
+ /**
+ * Pause this IO operation
+ */
+ void pause();
+
+ /**
+ *
+ * @return whether this IO operation is currently active (i.e., not paused)
+ */
+ boolean isActive();
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationScheduler.java
index 57a3483..5d89f9d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationScheduler.java
@@ -23,7 +23,19 @@
/**
* Schedules IO operations for LSM indexes
*/
-@FunctionalInterface
public interface ILSMIOOperationScheduler {
+
+ /**
+ * Schedule an IO operation
+ * @param operation
+ * @throws HyracksDataException
+ */
void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException;
+
+ /**
+ * Notify an IO operation has completed
+ * @param operation
+ * @throws HyracksDataException
+ */
+ void completeOperation(ILSMIOOperation operation) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java
new file mode 100644
index 0000000..1c8a4e1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.util.concurrent.ThreadFactory;
+
+public interface ILSMIOOperationSchedulerFactory {
+ ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback);
+
+ String getName();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
new file mode 100644
index 0000000..78185f0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+
+public abstract class AbstractAsynchronousScheduler implements ILSMIOOperationScheduler, Closeable {
+ protected final ExecutorService executor;
+ protected final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<>();
+ protected final Map<String, Deque<ILSMIOOperation>> waitingFlushOperations = new HashMap<>();
+ protected final Map<String, Throwable> failedGroups = new HashMap<>();
+
+ public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback) {
+ executor = new IoOperationExecutor(threadFactory, this, callback, runningFlushOperations,
+ waitingFlushOperations, failedGroups);
+ }
+
+ @Override
+ public void scheduleOperation(ILSMIOOperation operation) {
+ switch (operation.getIOOpertionType()) {
+ case FLUSH:
+ scheduleFlush(operation);
+ break;
+ case MERGE:
+ scheduleMerge(operation);
+ break;
+ case NOOP:
+ return;
+ default:
+ // this should never happen
+ // just guard here to avoid silent failures in case of future extensions
+ throw new IllegalArgumentException("Unknown operation type " + operation.getIOOpertionType());
+ }
+ }
+
+ protected abstract void scheduleMerge(ILSMIOOperation operation);
+
+ protected void scheduleFlush(ILSMIOOperation operation) {
+ String id = operation.getIndexIdentifier();
+ synchronized (executor) {
+ if (failedGroups.containsKey(id)) {
+ // Group failure. Fail the operation right away
+ operation.setStatus(LSMIOOperationStatus.FAILURE);
+ operation.setFailure(new RuntimeException("Operation group " + id + " has permanently failed",
+ failedGroups.get(id)));
+ operation.complete();
+ return;
+ }
+ if (runningFlushOperations.containsKey(id)) {
+ if (waitingFlushOperations.containsKey(id)) {
+ waitingFlushOperations.get(id).offer(operation);
+ } else {
+ Deque<ILSMIOOperation> q = new ArrayDeque<>();
+ q.offer(operation);
+ waitingFlushOperations.put(id, q);
+ }
+ } else {
+ runningFlushOperations.put(id, operation);
+ executor.submit(operation);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ executor.shutdown();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
index 38ef179..0938b5f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
@@ -21,6 +21,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -46,6 +47,8 @@
private boolean completed = false;
private List<IoOperationCompleteListener> completeListeners;
+ private final AtomicBoolean isActive = new AtomicBoolean(true);
+
public AbstractIoOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback,
String indexIdentifier) {
this.accessor = accessor;
@@ -180,4 +183,35 @@
public boolean hasFailed() {
return status == LSMIOOperationStatus.FAILURE;
}
+
+ @Override
+ public void resume() {
+ synchronized (this) {
+ isActive.set(true);
+ notifyAll();
+ }
+ }
+
+ @Override
+ public void pause() {
+ isActive.set(false);
+ }
+
+ @Override
+ public boolean isActive() {
+ return isActive.get();
+ }
+
+ public void waitIfPaused() throws HyracksDataException {
+ synchronized (this) {
+ while (!isActive.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index e4b845a..ac3481c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -18,80 +18,43 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerFactory;
-public class AsynchronousScheduler implements ILSMIOOperationScheduler, Closeable {
- // Since this is a asynchronous scheduler, we make sure that flush operations coming from the same lsm index
- // will be executed serially in same order of scheduling the operations. Look at asterix issue 630.
+/**
+ * The asynchronous scheduler schedules merge operations as they arrive and allocate disk bandwidth to them
+ * fairly. It avoids starvation of any merge. It is important to use this scheduler when measuring system performance.
+ *
+ */
+public class AsynchronousScheduler extends AbstractAsynchronousScheduler {
- private final ExecutorService executor;
- private final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<>();
- private final Map<String, Deque<ILSMIOOperation>> waitingFlushOperations = new HashMap<>();
- private final Map<String, Throwable> failedGroups = new HashMap<>();
+ public static final ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() {
+ @Override
+ public ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory,
+ IIoOperationFailedCallback callback) {
+ return new AsynchronousScheduler(threadFactory, callback);
+ }
- public AsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback) {
- executor = new IoOperationExecutor(threadFactory, this, callback, runningFlushOperations,
- waitingFlushOperations, failedGroups);
+ public String getName() {
+ return "async";
+ }
+ };
+
+ public AsynchronousScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback) {
+ super(threadFactory, callback);
}
@Override
- public void scheduleOperation(ILSMIOOperation operation) {
- switch (operation.getIOOpertionType()) {
- case FLUSH:
- scheduleFlush(operation);
- break;
- case MERGE:
- executor.submit(operation);
- break;
- case NOOP:
- return;
- default:
- // this should never happen
- // just guard here to avoid silent failures in case of future extensions
- throw new IllegalArgumentException("Unknown operation type " + operation.getIOOpertionType());
- }
- }
-
- private void scheduleFlush(ILSMIOOperation operation) {
- String id = operation.getIndexIdentifier();
- synchronized (executor) {
- if (failedGroups.containsKey(id)) {
- // Group failure. Fail the operation right away
- operation.setStatus(LSMIOOperationStatus.FAILURE);
- operation.setFailure(new RuntimeException("Operation group " + id + " has permanently failed",
- failedGroups.get(id)));
- operation.complete();
- return;
- }
- if (runningFlushOperations.containsKey(id)) {
- if (waitingFlushOperations.containsKey(id)) {
- waitingFlushOperations.get(id).offer(operation);
- } else {
- Deque<ILSMIOOperation> q = new ArrayDeque<>();
- q.offer(operation);
- waitingFlushOperations.put(id, q);
- }
- } else {
- runningFlushOperations.put(id, operation);
- executor.submit(operation);
- }
- }
+ protected void scheduleMerge(ILSMIOOperation operation) {
+ executor.submit(operation);
}
@Override
- public void close() throws IOException {
- executor.shutdown();
+ public void completeOperation(ILSMIOOperation operation) {
+ // no op
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
index 1516136..8e5c982 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.util.annotations.CriticalPath;
@@ -35,12 +36,15 @@
*/
public class ChainedLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkLoader {
+ private static final int CHECK_CYCLE = 1000;
+
private List<IChainedComponentBulkLoader> bulkloaderChain = new ArrayList<>();
private final ILSMIOOperation operation;
private final ILSMDiskComponent diskComponent;
private final boolean cleanupEmptyComponent;
private boolean isEmptyComponent = true;
private boolean cleanedUpArtifacts = false;
+ private int tupleCounter = 0;
public ChainedLSMDiskComponentBulkLoader(ILSMIOOperation operation, ILSMDiskComponent diskComponent,
boolean cleanupEmptyComponent) {
@@ -63,6 +67,7 @@
for (int i = 0; i < bulkloadersCount; i++) {
t = bulkloaderChain.get(i).add(t);
}
+ checkOperation();
} catch (Throwable e) {
operation.setFailure(e);
cleanupArtifacts();
@@ -83,6 +88,7 @@
for (int i = 0; i < bulkloadersCount; i++) {
t = bulkloaderChain.get(i).delete(t);
}
+ checkOperation();
} catch (Throwable e) {
operation.setFailure(e);
cleanupArtifacts();
@@ -165,4 +171,11 @@
bulkLoader.force();
}
}
+
+ private void checkOperation() throws HyracksDataException {
+ if (operation.getIOOpertionType() == LSMIOOperationType.MERGE && ++tupleCounter % CHECK_CYCLE == 0) {
+ tupleCounter = 0;
+ ((MergeOperation) operation).waitIfPaused();
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
index b2a2e48..260793a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
@@ -73,4 +73,9 @@
public int hashCode() {
return target.getFile().getName().hashCode();
}
+
+ @Override
+ public long getRemainingPages() {
+ return 0;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java
new file mode 100644
index 0000000..742ae24
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/GreedyScheduler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerFactory;
+
+/**
+ * This is a greedy asynchronous scheduler that always allocates the full bandwidth for the merge operation
+ * with the smallest required disk bandwidth to minimize the number of disk components. It has been proven
+ * that if the number of components in all merge operations are the same, then this scheduler is optimal
+ * by always minimizing the number of disk components over time; if not, this is still a good heuristic
+ *
+ */
+public class GreedyScheduler extends AbstractAsynchronousScheduler {
+ public static final ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() {
+ @Override
+ public ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory,
+ IIoOperationFailedCallback callback) {
+ return new GreedyScheduler(threadFactory, callback);
+ }
+
+ public String getName() {
+ return "greedy";
+ }
+ };
+
+ private final Map<String, List<ILSMIOOperation>> mergeOperations = new HashMap<>();
+
+ public GreedyScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback) {
+ super(threadFactory, callback);
+ }
+
+ protected void scheduleMerge(ILSMIOOperation operation) {
+ operation.pause();
+ String id = operation.getIndexIdentifier();
+ synchronized (executor) {
+ List<ILSMIOOperation> mergeOpList = mergeOperations.computeIfAbsent(id, key -> new ArrayList<>());
+ mergeOpList.add(operation);
+ dispatchMergeOperation(mergeOpList);
+ }
+ executor.submit(operation);
+ }
+
+ private void dispatchMergeOperation(List<ILSMIOOperation> mergeOps) {
+ ILSMIOOperation activeOp = null;
+ ILSMIOOperation smallestMergeOp = null;
+ for (ILSMIOOperation op : mergeOps) {
+ if (op.isActive()) {
+ activeOp = op;
+ }
+ if (smallestMergeOp == null || op.getRemainingPages() < smallestMergeOp.getRemainingPages()) {
+ smallestMergeOp = op;
+ }
+ }
+ if (smallestMergeOp != activeOp) {
+ if (activeOp != null) {
+ activeOp.pause();
+ }
+ smallestMergeOp.resume();
+ }
+ }
+
+ @Override
+ public void completeOperation(ILSMIOOperation op) {
+ if (op.getIOOpertionType() == LSMIOOperationType.MERGE) {
+ String id = op.getIndexIdentifier();
+ synchronized (executor) {
+ List<ILSMIOOperation> mergeOpList = mergeOperations.get(id);
+ mergeOpList.remove(op);
+ if (!mergeOpList.isEmpty()) {
+ dispatchMergeOperation(mergeOpList);
+ }
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
index 354b1af..d5354ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
@@ -79,6 +79,7 @@
if (!failed || executedOp.getIOOpertionType() != LSMIOOperationType.FLUSH) {
executedOp.complete(); // destroy if merge or successful flush
}
+ scheduler.completeOperation(executedOp);
if (executedOp.getIOOpertionType() == LSMIOOperationType.FLUSH) {
String id = executedOp.getIndexIdentifier();
synchronized (this) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 12caec4..b6f6e26 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -27,13 +27,16 @@
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
+import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
import org.apache.hyracks.storage.common.EnforcedIndexCursor;
+import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.MultiComparator;
public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
@@ -56,8 +59,9 @@
protected int hasNextCallCount = 0;
protected List<ILSMComponent> operationalComponents;
+ protected final IIndexAccessParameters iap;
- public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+ public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples, IIndexCursorStats stats) {
this.opCtx = opCtx;
this.returnDeletedTuples = returnDeletedTuples;
outputElement = null;
@@ -65,6 +69,7 @@
switchComponentTupleBuilders = new ArrayTupleBuilder[opCtx.getIndex().getNumberOfAllMemoryComponents()];
switchRequest = new boolean[switchComponentTupleBuilders.length];
switchedElements = new PriorityQueueElement[switchComponentTupleBuilders.length];
+ this.iap = IndexAccessParameters.createNoOpParams(stats);
}
public ILSMIndexOperationContext getOpCtx() {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LoadOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LoadOperation.java
index 21c52d0..170076c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LoadOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LoadOperation.java
@@ -59,4 +59,9 @@
public Map<String, Object> getParameters() {
return parameters;
}
+
+ @Override
+ public long getRemainingPages() {
+ return 0;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
index 9d7c449..74f8913 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
@@ -22,18 +22,25 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.storage.am.lsm.common.api.AbstractLSMWithBloomFilterDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
public abstract class MergeOperation extends AbstractIoOperation {
protected final IIndexCursor cursor;
+ protected final IIndexCursorStats stats;
+ protected final long totalPages;
public MergeOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback,
- String indexIdentifier, IIndexCursor cursor) {
+ String indexIdentifier, IIndexCursor cursor, IIndexCursorStats stats) {
super(accessor, target, callback, indexIdentifier);
this.cursor = cursor;
+ this.stats = stats;
+ this.totalPages = computeTotalComponentPages(accessor);
}
public List<ILSMComponent> getMergingComponents() {
@@ -54,4 +61,31 @@
public IIndexCursor getCursor() {
return cursor;
}
+
+ private long computeTotalComponentPages(ILSMIndexAccessor accessor) {
+ List<ILSMDiskComponent> components = accessor.getOpContext().getComponentsToBeMerged();
+ long totalSize = 0;
+ for (ILSMDiskComponent component : components) {
+ long componentSize = component.getComponentSize();
+ if (component instanceof AbstractLSMWithBloomFilterDiskComponent) {
+ // exclude the size of bloom filters since we do not scan them during merge
+ componentSize -= ((AbstractLSMWithBloomFilterDiskComponent) component).getBloomFilter()
+ .getFileReference().getFile().length();
+ }
+ totalSize += componentSize;
+ }
+ return totalSize / accessor.getOpContext().getIndex().getBufferCache().getPageSize();
+ }
+
+ public long getRemainingPages() {
+ return totalPages - stats.getPageCounter().get();
+ }
+
+ public long getTotalPages() {
+ return totalPages;
+ }
+
+ public IIndexCursorStats getCursorStats() {
+ return stats;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
index cf0f4e5..7351bdf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
@@ -138,4 +138,24 @@
return false;
}
+ @Override
+ public long getRemainingPages() {
+ return 0;
+ }
+
+ @Override
+ public void resume() {
+ // No Op
+ }
+
+ @Override
+ public void pause() {
+ // No Op
+ }
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
index ae3b4e3..e6fb3b1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
@@ -55,6 +56,11 @@
}
}
+ @Override
+ public void completeOperation(ILSMIOOperation operation) throws HyracksDataException {
+ // no op
+ }
+
private void run(ILSMIOOperation operation) {
try {
operation.call();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
index a2304ed..8adf5f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
@@ -174,4 +174,24 @@
public boolean hasFailed() {
return ioOp.hasFailed();
}
+
+ @Override
+ public long getRemainingPages() {
+ return ioOp.getRemainingPages();
+ }
+
+ @Override
+ public void resume() {
+ ioOp.resume();
+ }
+
+ @Override
+ public void pause() {
+ ioOp.pause();
+ }
+
+ @Override
+ public boolean isActive() {
+ return ioOp.isActive();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java
index 27966d1..ed4287d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInPlaceInvertedIndex.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
public interface IInPlaceInvertedIndex extends IInvertedIndex {
/**
@@ -37,7 +38,7 @@
*
* @throws HyracksDataException
*/
- InvertedListCursor createInvertedListRangeSearchCursor() throws HyracksDataException;
+ InvertedListCursor createInvertedListRangeSearchCursor(IIndexCursorStats stats) throws HyracksDataException;
/**
* Opens an inverted list cursor
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 499720a..068df9a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -39,7 +39,6 @@
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
@@ -68,9 +67,10 @@
import org.apache.hyracks.storage.am.lsm.invertedindex.util.InvertedIndexUtils;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
-import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.IndexCursorStats;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.util.trace.ITracer;
@@ -191,39 +191,16 @@
@Override
public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
throws HyracksDataException {
- LSMInvertedIndexOpContext ctx = (LSMInvertedIndexOpContext) ictx;
List<ILSMComponent> operationalComponents = ictx.getComponentHolder();
- int numComponents = operationalComponents.size();
boolean includeMutableComponent = false;
- ArrayList<IIndexAccessor> indexAccessors = new ArrayList<>(numComponents);
- ArrayList<IIndexAccessor> deletedKeysBTreeAccessors = new ArrayList<>(numComponents);
- for (int i = 0; i < operationalComponents.size(); i++) {
- ILSMComponent component = operationalComponents.get(i);
- if (component.getType() == LSMComponentType.MEMORY) {
- includeMutableComponent = true;
- IIndexAccessor invIndexAccessor = component.getIndex().createAccessor(ctx.getIndexAccessParameters());
- indexAccessors.add(invIndexAccessor);
- IIndexAccessor deletedKeysAccessor = ((LSMInvertedIndexMemoryComponent) component).getBuddyIndex()
- .createAccessor(NoOpIndexAccessParameters.INSTANCE);
- deletedKeysBTreeAccessors.add(deletedKeysAccessor);
- } else {
- IIndexAccessor invIndexAccessor = component.getIndex().createAccessor(ctx.getIndexAccessParameters());
- indexAccessors.add(invIndexAccessor);
- IIndexAccessor deletedKeysAccessor = ((LSMInvertedIndexDiskComponent) component).getBuddyIndex()
- .createAccessor(NoOpIndexAccessParameters.INSTANCE);
- deletedKeysBTreeAccessors.add(deletedKeysAccessor);
- }
- }
-
- ICursorInitialState initState = createCursorInitialState(pred, ictx, includeMutableComponent, indexAccessors,
- deletedKeysBTreeAccessors, operationalComponents);
+ ICursorInitialState initState =
+ createCursorInitialState(pred, ictx, includeMutableComponent, operationalComponents);
cursor.open(initState, pred);
}
private ICursorInitialState createCursorInitialState(ISearchPredicate pred, IIndexOperationContext ictx,
- boolean includeMutableComponent, ArrayList<IIndexAccessor> indexAccessors,
- ArrayList<IIndexAccessor> deletedKeysBTreeAccessors, List<ILSMComponent> operationalComponents) {
+ boolean includeMutableComponent, List<ILSMComponent> operationalComponents) {
ICursorInitialState initState;
PermutingTupleReference keysOnlyTuple = createKeysOnlyTupleReference();
MultiComparator keyCmp = MultiComparator.create(invListCmpFactories);
@@ -231,8 +208,7 @@
// TODO: This check is not pretty, but it does the job. Come up with something more OO in the future.
// Distinguish between regular searches and range searches (mostly used in merges).
if (pred instanceof InvertedIndexSearchPredicate) {
- initState = new LSMInvertedIndexSearchCursorInitialState(keyCmp, keysOnlyTuple, indexAccessors,
- deletedKeysBTreeAccessors,
+ initState = new LSMInvertedIndexSearchCursorInitialState(keyCmp, keysOnlyTuple,
((LSMInvertedIndexMemoryComponent) memoryComponents.get(currentMutableComponentId.get()))
.getBuddyIndex().getLeafFrameFactory(),
ictx, includeMutableComponent, getHarness(), operationalComponents);
@@ -244,8 +220,7 @@
initState = new LSMInvertedIndexRangeSearchCursorInitialState(tokensAndKeysCmp, keyCmp, keysOnlyTuple,
((LSMInvertedIndexMemoryComponent) memoryComponents.get(currentMutableComponentId.get()))
.getBuddyIndex().getLeafFrameFactory(),
- includeMutableComponent, getHarness(), indexAccessors, deletedKeysBTreeAccessors, pred,
- operationalComponents);
+ includeMutableComponent, getHarness(), pred, operationalComponents);
}
return initState;
}
@@ -361,7 +336,7 @@
.get(diskComponents.size() - 1)) {
// Keep the deleted tuples since the oldest disk component is not included in the merge operation
LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor =
- new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx);
+ new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx, mergeOp.getCursorStats());
try {
long numElements = 0L;
for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
@@ -497,8 +472,9 @@
protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException {
ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getHarness(), opCtx);
- IIndexCursor cursor = new LSMInvertedIndexMergeCursor(opCtx);
- return new LSMInvertedIndexMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(),
+ IIndexCursorStats stats = new IndexCursorStats();
+ IIndexCursor cursor = new LSMInvertedIndexMergeCursor(opCtx, stats);
+ return new LSMInvertedIndexMergeOperation(accessor, cursor, stats, mergeFileRefs.getInsertIndexFileReference(),
mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback,
getIndexIdentifier());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
index faa90eb..f9a2e73 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
@@ -19,23 +19,24 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
-import java.util.ArrayList;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMInvertedIndexDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
- public LSMInvertedIndexDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx, true);
+ public LSMInvertedIndexDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx, IIndexCursorStats stats) {
+ super(opCtx, true, stats);
}
@Override
@@ -57,9 +58,16 @@
MultiComparator keyCmp = lsmInitialState.getKeyComparator();
RangePredicate btreePredicate = new RangePredicate(null, null, true, true, keyCmp, keyCmp);
- ArrayList<IIndexAccessor> btreeAccessors = lsmInitialState.getDeletedKeysBTreeAccessors();
+
+ IIndexAccessor[] btreeAccessors = new IIndexAccessor[operationalComponents.size()];
for (int i = 0; i < numBTrees; i++) {
- rangeCursors[i] = btreeAccessors.get(i).createSearchCursor(false);
+ ILSMComponent component = operationalComponents.get(i);
+ if (component.getType() == LSMComponentType.MEMORY) {
+ btreeAccessors[i] = ((LSMInvertedIndexMemoryComponent) component).getBuddyIndex().createAccessor(iap);
+ } else {
+ btreeAccessors[i] = ((LSMInvertedIndexDiskComponent) component).getBuddyIndex().createAccessor(iap);
+ }
+ rangeCursors[i] = btreeAccessors[i].createSearchCursor(false);
}
IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java
index c80455d..dcf4c33 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
-import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
@@ -29,6 +28,8 @@
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
+import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
+import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
@@ -41,8 +42,10 @@
import org.apache.hyracks.storage.am.lsm.invertedindex.tuples.TokenKeyPairTuple;
import org.apache.hyracks.storage.common.EnforcedIndexCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
@@ -77,12 +80,13 @@
protected IIndexCursor[] deletedKeysBTreeCursors;
protected BloomFilter[] bloomFilters;
protected final long[] hashes = BloomFilter.createHashArray();
- protected ArrayList<IIndexAccessor> deletedKeysBTreeAccessors;
+ protected IIndexAccessor[] deletedKeysBTreeAccessors;
protected RangePredicate deletedKeyBTreeSearchPred;
protected final TokenKeyPairTuple outputTuple;
+ protected final IIndexAccessParameters iap;
- public LSMInvertedIndexMergeCursor(ILSMIndexOperationContext opCtx) {
+ public LSMInvertedIndexMergeCursor(ILSMIndexOperationContext opCtx, IIndexCursorStats stats) {
this.opCtx = (LSMInvertedIndexOpContext) opCtx;
outputTokenElement = null;
outputKeyElement = null;
@@ -96,6 +100,7 @@
this.keyCmp = MultiComparator.create(invertedIndex.getInvListCmpFactories());
this.tokenQueueCmp = new PriorityQueueComparator(tokenCmp);
this.keyQueueCmp = new PriorityQueueComparator(keyCmp);
+ this.iap = IndexAccessParameters.createNoOpParams(stats);
}
public LSMInvertedIndexOpContext getOpCtx() {
@@ -106,29 +111,33 @@
public void doOpen(ICursorInitialState initState, ISearchPredicate searchPred) throws HyracksDataException {
LSMInvertedIndexRangeSearchCursorInitialState lsmInitState =
(LSMInvertedIndexRangeSearchCursorInitialState) initState;
+ List<ILSMComponent> components = lsmInitState.getOperationalComponents();
int numComponents = lsmInitState.getNumComponents();
rangeCursors = new OnDiskInvertedIndexRangeSearchCursor[numComponents];
for (int i = 0; i < numComponents; i++) {
- IInvertedIndexAccessor invIndexAccessor = (IInvertedIndexAccessor) lsmInitState.getIndexAccessors().get(i);
+ IInvertedIndexAccessor invIndexAccessor =
+ (IInvertedIndexAccessor) components.get(i).getIndex().createAccessor(iap);
rangeCursors[i] = (OnDiskInvertedIndexRangeSearchCursor) invIndexAccessor.createRangeSearchCursor();
invIndexAccessor.rangeSearch(rangeCursors[i], lsmInitState.getSearchPredicate());
}
lsmHarness = lsmInitState.getLSMHarness();
operationalComponents = lsmInitState.getOperationalComponents();
- deletedKeysBTreeAccessors = lsmInitState.getDeletedKeysBTreeAccessors();
- bloomFilters = new BloomFilter[deletedKeysBTreeAccessors.size()];
- if (!deletedKeysBTreeAccessors.isEmpty()) {
- deletedKeysBTreeCursors = new IIndexCursor[deletedKeysBTreeAccessors.size()];
- for (int i = 0; i < operationalComponents.size(); i++) {
- ILSMComponent component = operationalComponents.get(i);
- deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor(false);
- if (component.getType() == LSMComponentType.MEMORY) {
- // No need for a bloom filter for the in-memory BTree.
- bloomFilters[i] = null;
- } else {
- bloomFilters[i] = ((LSMInvertedIndexDiskComponent) component).getBloomFilter();
- }
+ deletedKeysBTreeAccessors = new IIndexAccessor[numComponents];
+ bloomFilters = new BloomFilter[numComponents];
+ deletedKeysBTreeCursors = new IIndexCursor[numComponents];
+ for (int i = 0; i < numComponents; i++) {
+ ILSMComponent component = operationalComponents.get(i);
+ if (component.getType() == LSMComponentType.MEMORY) {
+ // No need for a bloom filter for the in-memory BTree.
+ deletedKeysBTreeAccessors[i] = ((LSMInvertedIndexMemoryComponent) component).getBuddyIndex()
+ .createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ bloomFilters[i] = null;
+ } else {
+ deletedKeysBTreeAccessors[i] = ((LSMInvertedIndexDiskComponent) component).getBuddyIndex()
+ .createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ bloomFilters[i] = ((LSMInvertedIndexDiskComponent) component).getBloomFilter();
}
+ deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors[i].createSearchCursor(false);
}
deletedKeyBTreeSearchPred = new RangePredicate(null, null, true, true, keyCmp, keyCmp);
initPriorityQueues();
@@ -297,7 +306,7 @@
continue;
}
deletedKeysBTreeCursors[i].close();
- deletedKeysBTreeAccessors.get(i).search(deletedKeysBTreeCursors[i], deletedKeyBTreeSearchPred);
+ deletedKeysBTreeAccessors[i].search(deletedKeysBTreeCursors[i], deletedKeyBTreeSearchPred);
try {
if (deletedKeysBTreeCursors[i].hasNext()) {
return true;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index 408d9bb..31c79f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -25,15 +25,16 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
public class LSMInvertedIndexMergeOperation extends MergeOperation {
private final FileReference deletedKeysBTreeMergeTarget;
private final FileReference bloomFilterMergeTarget;
- public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, FileReference target,
- FileReference deletedKeysBTreeMergeTarget, FileReference bloomFilterMergeTarget,
+ public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, IIndexCursorStats stats,
+ FileReference target, FileReference deletedKeysBTreeMergeTarget, FileReference bloomFilterMergeTarget,
ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, target, callback, indexIdentifier, cursor);
+ super(accessor, target, callback, indexIdentifier, cursor, stats);
this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
}
@@ -50,4 +51,5 @@
public LSMComponentFileReferences getComponentFiles() {
return new LSMComponentFileReferences(target, deletedKeysBTreeMergeTarget, bloomFilterMergeTarget);
}
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index d5f77ec..08a8512 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -19,7 +19,7 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
-import java.util.ArrayList;
+import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.PermutingTupleReference;
@@ -35,6 +35,7 @@
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
public class LSMInvertedIndexRangeSearchCursor extends LSMIndexSearchCursor {
@@ -42,23 +43,27 @@
private IIndexCursor[] deletedKeysBTreeCursors;
protected BloomFilter[] bloomFilters;
protected final long[] hashes = BloomFilter.createHashArray();
- protected ArrayList<IIndexAccessor> deletedKeysBTreeAccessors;
+ protected IIndexAccessor[] deletedKeysBTreeAccessors;
protected PermutingTupleReference keysOnlyTuple;
protected RangePredicate keySearchPred;
public LSMInvertedIndexRangeSearchCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx, false);
+ super(opCtx, false, NoOpIndexCursorStats.INSTANCE);
}
@Override
public void doOpen(ICursorInitialState initState, ISearchPredicate searchPred) throws HyracksDataException {
+ LSMInvertedIndexOpContext ctx = (LSMInvertedIndexOpContext) opCtx;
LSMInvertedIndexRangeSearchCursorInitialState lsmInitState =
(LSMInvertedIndexRangeSearchCursorInitialState) initState;
+ List<ILSMComponent> components = lsmInitState.getOperationalComponents();
cmp = lsmInitState.getOriginalKeyComparator();
int numComponents = lsmInitState.getNumComponents();
rangeCursors = new IIndexCursor[numComponents];
for (int i = 0; i < numComponents; i++) {
- IInvertedIndexAccessor invIndexAccessor = (IInvertedIndexAccessor) lsmInitState.getIndexAccessors().get(i);
+ ILSMComponent component = components.get(i);
+ IInvertedIndexAccessor invIndexAccessor =
+ (IInvertedIndexAccessor) component.getIndex().createAccessor(ctx.getIndexAccessParameters());
rangeCursors[i] = invIndexAccessor.createRangeSearchCursor();
invIndexAccessor.rangeSearch(rangeCursors[i], lsmInitState.getSearchPredicate());
}
@@ -68,19 +73,23 @@
// For searching the deleted-keys BTrees.
this.keysOnlyTuple = lsmInitState.getKeysOnlyTuple();
- deletedKeysBTreeAccessors = lsmInitState.getDeletedKeysBTreeAccessors();
- bloomFilters = new BloomFilter[deletedKeysBTreeAccessors.size()];
- if (!deletedKeysBTreeAccessors.isEmpty()) {
- deletedKeysBTreeCursors = new IIndexCursor[deletedKeysBTreeAccessors.size()];
+ bloomFilters = new BloomFilter[numComponents];
+ if (numComponents > 0) {
+ deletedKeysBTreeAccessors = new IIndexAccessor[numComponents];
+ deletedKeysBTreeCursors = new IIndexCursor[numComponents];
for (int i = 0; i < operationalComponents.size(); i++) {
ILSMComponent component = operationalComponents.get(i);
- deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor(false);
if (component.getType() == LSMComponentType.MEMORY) {
// No need for a bloom filter for the in-memory BTree.
bloomFilters[i] = null;
+ deletedKeysBTreeAccessors[i] =
+ ((LSMInvertedIndexMemoryComponent) component).getBuddyIndex().createAccessor(iap);
} else {
bloomFilters[i] = ((LSMInvertedIndexDiskComponent) component).getBloomFilter();
+ deletedKeysBTreeAccessors[i] =
+ ((LSMInvertedIndexDiskComponent) component).getBuddyIndex().createAccessor(iap);
}
+ deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors[i].createSearchCursor(false);
}
}
MultiComparator keyCmp = lsmInitState.getKeyComparator();
@@ -101,7 +110,7 @@
if (bloomFilters[i] != null && !bloomFilters[i].contains(keysOnlyTuple, hashes)) {
continue;
}
- deletedKeysBTreeAccessors.get(i).search(deletedKeysBTreeCursors[i], keySearchPred);
+ deletedKeysBTreeAccessors[i].search(deletedKeysBTreeCursors[i], keySearchPred);
try {
if (deletedKeysBTreeCursors[i].hasNext()) {
return true;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursorInitialState.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursorInitialState.java
index 80cc649..b2cd17b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursorInitialState.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursorInitialState.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
-import java.util.ArrayList;
import java.util.List;
import org.apache.hyracks.dataflow.common.data.accessors.PermutingTupleReference;
@@ -27,7 +26,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.common.ICursorInitialState;
-import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
@@ -39,8 +37,6 @@
private final MultiComparator keyCmp;
private final ILSMHarness lsmHarness;
- private final ArrayList<IIndexAccessor> indexAccessors;
- private final ArrayList<IIndexAccessor> deletedKeysBTreeAccessors;
private final ISearchPredicate predicate;
private final PermutingTupleReference keysOnlyTuple;
private final ITreeIndexFrameFactory deletedKeysBtreeLeafFrameFactory;
@@ -50,23 +46,20 @@
public LSMInvertedIndexRangeSearchCursorInitialState(MultiComparator tokensAndKeysCmp, MultiComparator keyCmp,
PermutingTupleReference keysOnlyTuple, ITreeIndexFrameFactory deletedKeysBtreeLeafFrameFactory,
- boolean includeMemComponent, ILSMHarness lsmHarness, ArrayList<IIndexAccessor> indexAccessors,
- ArrayList<IIndexAccessor> deletedKeysBTreeAccessors, ISearchPredicate predicate,
+ boolean includeMemComponent, ILSMHarness lsmHarness, ISearchPredicate predicate,
List<ILSMComponent> operationalComponents) {
this.tokensAndKeysCmp = tokensAndKeysCmp;
this.keyCmp = keyCmp;
this.keysOnlyTuple = keysOnlyTuple;
this.deletedKeysBtreeLeafFrameFactory = deletedKeysBtreeLeafFrameFactory;
this.lsmHarness = lsmHarness;
- this.indexAccessors = indexAccessors;
- this.deletedKeysBTreeAccessors = deletedKeysBTreeAccessors;
this.predicate = predicate;
this.includeMemComponent = includeMemComponent;
this.operationalComponents = operationalComponents;
}
public int getNumComponents() {
- return indexAccessors.size();
+ return operationalComponents.size();
}
@Override
@@ -96,14 +89,6 @@
// Do nothing.
}
- public ArrayList<IIndexAccessor> getIndexAccessors() {
- return indexAccessors;
- }
-
- public ArrayList<IIndexAccessor> getDeletedKeysBTreeAccessors() {
- return deletedKeysBTreeAccessors;
- }
-
public ISearchPredicate getSearchPredicate() {
return predicate;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
index d39c601..6e93ccb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
+import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
@@ -49,14 +50,14 @@
private int accessorIndex = -1;
private boolean tupleConsumed = true;
private ILSMHarness harness;
- private List<IIndexAccessor> indexAccessors;
+ private IIndexAccessor[] indexAccessors;
private ISearchPredicate searchPred;
private ISearchOperationCallback searchCallback;
// Assuming the cursor for all deleted-keys indexes are of the same type.
private IIndexCursor[] deletedKeysBTreeCursors;
private BloomFilter[] deletedKeysBTreeBloomFilters;
- private List<IIndexAccessor> deletedKeysBTreeAccessors;
+ private IIndexAccessor[] deletedKeysBTreeAccessors;
private RangePredicate keySearchPred;
private ILSMIndexOperationContext opCtx;
private boolean includeMemoryComponents;
@@ -70,28 +71,34 @@
@Override
public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMInvertedIndexSearchCursorInitialState lsmInitState = (LSMInvertedIndexSearchCursorInitialState) initialState;
+ LSMInvertedIndexOpContext lsmOpCtx = (LSMInvertedIndexOpContext) lsmInitState.getOpContext();
harness = lsmInitState.getLSMHarness();
operationalComponents = lsmInitState.getOperationalComponents();
- indexAccessors = lsmInitState.getIndexAccessors();
+ indexAccessors = new IIndexAccessor[operationalComponents.size()];
opCtx = lsmInitState.getOpContext();
accessorIndex = 0;
this.searchPred = searchPred;
this.searchCallback = lsmInitState.getSearchOperationCallback();
includeMemoryComponents = false;
// For searching the deleted-keys BTrees.
- deletedKeysBTreeAccessors = lsmInitState.getDeletedKeysBTreeAccessors();
- deletedKeysBTreeCursors = new IIndexCursor[deletedKeysBTreeAccessors.size()];
- deletedKeysBTreeBloomFilters = new BloomFilter[deletedKeysBTreeAccessors.size()];
+ deletedKeysBTreeAccessors = new IIndexAccessor[operationalComponents.size()];
+ deletedKeysBTreeCursors = new IIndexCursor[operationalComponents.size()];
+ deletedKeysBTreeBloomFilters = new BloomFilter[operationalComponents.size()];
for (int i = 0; i < operationalComponents.size(); i++) {
ILSMComponent component = operationalComponents.get(i);
- deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor(false);
+ indexAccessors[i] = component.getIndex().createAccessor(lsmOpCtx.getIndexAccessParameters());
if (component.getType() == LSMComponentType.MEMORY) {
// No need for a bloom filter for the in-memory BTree.
+ deletedKeysBTreeAccessors[i] = ((LSMInvertedIndexMemoryComponent) component).getBuddyIndex()
+ .createAccessor(NoOpIndexAccessParameters.INSTANCE);
deletedKeysBTreeBloomFilters[i] = null;
includeMemoryComponents = true;
} else {
+ deletedKeysBTreeAccessors[i] = ((LSMInvertedIndexDiskComponent) component).getBuddyIndex()
+ .createAccessor(NoOpIndexAccessParameters.INSTANCE);
deletedKeysBTreeBloomFilters[i] = ((LSMInvertedIndexDiskComponent) component).getBloomFilter();
}
+ deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors[i].createSearchCursor(false);
}
MultiComparator keyCmp = lsmInitState.getKeyComparator();
@@ -107,7 +114,7 @@
continue;
}
try {
- deletedKeysBTreeAccessors.get(i).search(deletedKeysBTreeCursors[i], keySearchPred);
+ deletedKeysBTreeAccessors[i].search(deletedKeysBTreeCursors[i], keySearchPred);
if (deletedKeysBTreeCursors[i].hasNext()) {
return true;
}
@@ -155,9 +162,9 @@
currentCursor.close();
accessorIndex++;
}
- while (accessorIndex < indexAccessors.size()) {
+ while (accessorIndex < indexAccessors.length) {
// Current cursor has been exhausted, switch to next accessor/cursor.
- currentAccessor = indexAccessors.get(accessorIndex);
+ currentAccessor = indexAccessors[accessorIndex];
currentCursor = currentAccessor.createSearchCursor(false);
currentAccessor.search(currentCursor, searchPred);
if (nextValidTuple()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
index 7b6e446..578009f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
@@ -28,7 +28,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.ICursorInitialState;
-import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -39,8 +38,6 @@
private final boolean includeMemComponent;
private final ILSMHarness lsmHarness;
- private final List<IIndexAccessor> indexAccessors;
- private final List<IIndexAccessor> deletedKeysBTreeAccessors;
private final LSMInvertedIndexOpContext ctx;
private ISearchOperationCallback searchCallback;
private MultiComparator originalCmp;
@@ -57,18 +54,15 @@
private int invListNumElements = INVALID_VALUE;
public LSMInvertedIndexSearchCursorInitialState() {
- this(null, null, null, null, null, null, false, null, null);
+ this(null, null, null, null, false, null, null);
resetInvertedListInfo();
}
public LSMInvertedIndexSearchCursorInitialState(final MultiComparator keyCmp, PermutingTupleReference keysOnlyTuple,
- List<IIndexAccessor> indexAccessors, List<IIndexAccessor> deletedKeysBTreeAccessors,
ITreeIndexFrameFactory deletedKeysBtreeLeafFrameFactory, IIndexOperationContext ctx,
boolean includeMemComponent, ILSMHarness lsmHarness, List<ILSMComponent> operationalComponents) {
this.keyCmp = keyCmp;
this.keysOnlyTuple = keysOnlyTuple;
- this.indexAccessors = indexAccessors;
- this.deletedKeysBTreeAccessors = deletedKeysBTreeAccessors;
this.deletedKeysBtreeLeafFrameFactory = deletedKeysBtreeLeafFrameFactory;
this.includeMemComponent = includeMemComponent;
this.operationalComponents = operationalComponents;
@@ -91,10 +85,6 @@
return operationalComponents;
}
- public List<IIndexAccessor> getIndexAccessors() {
- return indexAccessors;
- }
-
public boolean getIncludeMemComponent() {
return includeMemComponent;
}
@@ -131,10 +121,6 @@
return keyCmp;
}
- public List<IIndexAccessor> getDeletedKeysBTreeAccessors() {
- return deletedKeysBTreeAccessors;
- }
-
public ITreeIndexFrameFactory getgetDeletedKeysBTreeLeafFrameFactory() {
return deletedKeysBtreeLeafFrameFactory;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index 21d1acf..a7ce35c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
@@ -158,7 +159,7 @@
}
@Override
- public InvertedListCursor createInvertedListRangeSearchCursor() {
+ public InvertedListCursor createInvertedListRangeSearchCursor(IIndexCursorStats stats) {
// An in-memory index does not have a separate inverted list.
// Therefore, a different range-search cursor for an inverted list is not required.
return createInvertedListCursor(null);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java
index 7f3d12f..6002dc8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -79,9 +80,10 @@
private boolean moreBlocksToRead = true;
// The last searched element index (used for random traversal)
private int lastRandomSearchedElementIx;
+ private final IIndexCursorStats stats;
public FixedSizeElementInvertedListCursor(IBufferCache bufferCache, int fileId, ITypeTraits[] invListFields,
- IHyracksTaskContext ctx) throws HyracksDataException {
+ IHyracksTaskContext ctx, IIndexCursorStats stats) throws HyracksDataException {
this.bufferCache = bufferCache;
this.fileId = fileId;
int tmpSize = 0;
@@ -109,6 +111,7 @@
if (bufferManagerForSearch == null) {
throw HyracksDataException.create(ErrorCode.CANNOT_CONTINUE_TEXT_SEARCH_BUFFER_MANAGER_IS_NULL);
}
+ this.stats = stats;
}
/**
@@ -216,6 +219,7 @@
ByteBuffer tmpBuffer;
for (int i = bufferStartPageId; i <= endPageId; i++) {
page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, i), false);
+ stats.getPageCounter().update(1);
// Copies the content to the buffer (working memory).
// Assumption: processing inverted list takes time; so, we don't want to keep them on the buffer cache.
// Rather, we utilize the assigned working memory (buffers).
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListScanCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListScanCursor.java
index 2401c67..583fb90 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListScanCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListScanCursor.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -54,9 +55,10 @@
protected boolean pinned;
protected int pinnedPageId = -1;
+ protected final IIndexCursorStats stats;
- public FixedSizeElementInvertedListScanCursor(IBufferCache bufferCache, int fileId, ITypeTraits[] invListFields)
- throws HyracksDataException {
+ public FixedSizeElementInvertedListScanCursor(IBufferCache bufferCache, int fileId, ITypeTraits[] invListFields,
+ IIndexCursorStats stats) throws HyracksDataException {
this.bufferCache = bufferCache;
this.fileId = fileId;
int tmpSize = 0;
@@ -74,6 +76,7 @@
this.numPages = 0;
this.tuple = new FixedSizeTupleReference(invListFields);
this.pinned = false;
+ this.stats = stats;
}
@Override
@@ -117,6 +120,8 @@
page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false);
pinnedPageId = currentPageId;
pinned = true;
+ stats.getPageCounter().update(1);
+
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 2996bcf..42171d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -56,8 +56,10 @@
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IFIFOPageWriter;
@@ -188,12 +190,13 @@
@Override
public InvertedListCursor createInvertedListCursor(IHyracksTaskContext ctx) throws HyracksDataException {
- return new FixedSizeElementInvertedListCursor(bufferCache, fileId, invListTypeTraits, ctx);
+ return new FixedSizeElementInvertedListCursor(bufferCache, fileId, invListTypeTraits, ctx,
+ NoOpIndexCursorStats.INSTANCE);
}
@Override
- public InvertedListCursor createInvertedListRangeSearchCursor() throws HyracksDataException {
- return new FixedSizeElementInvertedListScanCursor(bufferCache, fileId, invListTypeTraits);
+ public InvertedListCursor createInvertedListRangeSearchCursor(IIndexCursorStats stats) throws HyracksDataException {
+ return new FixedSizeElementInvertedListScanCursor(bufferCache, fileId, invListTypeTraits, stats);
}
@Override
@@ -482,12 +485,14 @@
protected final IHyracksTaskContext ctx;
protected IInvertedIndexSearcher searcher;
private boolean destroyed = false;
+ protected final IIndexAccessParameters iap;
- public OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IHyracksTaskContext ctx)
+ public OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IIndexAccessParameters iap)
throws HyracksDataException {
this.index = index;
- this.ctx = ctx;
+ this.ctx = (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT);
this.opCtx = new OnDiskInvertedIndexOpContext(btree);
+ this.iap = iap;
}
@Override
@@ -519,7 +524,8 @@
@Override
public IIndexCursor createRangeSearchCursor() throws HyracksDataException {
- return new OnDiskInvertedIndexRangeSearchCursor(index, opCtx);
+ return new OnDiskInvertedIndexRangeSearchCursor(index, opCtx, (IIndexCursorStats) iap.getParameters()
+ .getOrDefault(HyracksConstants.INDEX_CURSOR_STATS, NoOpIndexCursorStats.INSTANCE));
}
@Override
@@ -560,8 +566,7 @@
@Override
public OnDiskInvertedIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException {
- return new OnDiskInvertedIndexAccessor(this,
- (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT));
+ return new OnDiskInvertedIndexAccessor(this, iap);
}
@Override
@@ -613,7 +618,7 @@
ArrayTupleReference prevTuple = new ArrayTupleReference();
IInvertedIndexAccessor invIndexAccessor = createAccessor(NoOpIndexAccessParameters.INSTANCE);
try {
- InvertedListCursor invListCursor = createInvertedListRangeSearchCursor();
+ InvertedListCursor invListCursor = createInvertedListRangeSearchCursor(NoOpIndexCursorStats.INSTANCE);
MultiComparator invListCmp = MultiComparator.create(invListCmpFactories);
while (btreeCursor.hasNext()) {
btreeCursor.next();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
index 6d29e75..8ec9876 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
@@ -24,13 +24,15 @@
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
-import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
import org.apache.hyracks.storage.am.lsm.invertedindex.tuples.TokenKeyPairTuple;
import org.apache.hyracks.storage.common.EnforcedIndexCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
/**
@@ -51,10 +53,12 @@
private final PermutingTupleReference tokenTuple;
private final TokenKeyPairTuple resultTuple;
- public OnDiskInvertedIndexRangeSearchCursor(OnDiskInvertedIndex invIndex, IIndexOperationContext opCtx)
- throws HyracksDataException {
+ public OnDiskInvertedIndexRangeSearchCursor(OnDiskInvertedIndex invIndex, IIndexOperationContext opCtx,
+ IIndexCursorStats stats) throws HyracksDataException {
this.btree = invIndex.getBTree();
- this.btreeAccessor = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+
+ IIndexAccessParameters iap = IndexAccessParameters.createNoOpParams(stats);
+ this.btreeAccessor = btree.createAccessor(iap);
this.invIndex = invIndex;
this.opCtx = opCtx;
// Project away non-token fields of the BTree tuples.
@@ -65,7 +69,7 @@
tokenTuple = new PermutingTupleReference(fieldPermutation);
btreeCursor = btreeAccessor.createSearchCursor(false);
resultTuple = new TokenKeyPairTuple(invIndex.getTokenTypeTraits().length, btree.getCmpFactories().length);
- invListRangeSearchCursor = invIndex.createInvertedListRangeSearchCursor();
+ invListRangeSearchCursor = invIndex.createInvertedListRangeSearchCursor(stats);
isInvListCursorOpen = false;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
index 8c6b386..1521887 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -19,12 +19,10 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.data.std.primitive.ShortPointable;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
@@ -54,9 +52,9 @@
}
public class PartitionedOnDiskInvertedIndexAccessor extends OnDiskInvertedIndexAccessor {
- public PartitionedOnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IHyracksTaskContext ctx)
+ public PartitionedOnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IIndexAccessParameters iap)
throws HyracksDataException {
- super(index, ctx);
+ super(index, iap);
}
@Override
@@ -79,8 +77,7 @@
@Override
public PartitionedOnDiskInvertedIndexAccessor createAccessor(IIndexAccessParameters iap)
throws HyracksDataException {
- return new PartitionedOnDiskInvertedIndexAccessor(this,
- (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT));
+ return new PartitionedOnDiskInvertedIndexAccessor(this, iap);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index 24c2dbe..5557bbe 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -57,8 +57,10 @@
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.IndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
@@ -276,7 +278,8 @@
// Keep the deleted tuples since the oldest disk component is not
// included in the merge operation
- LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
+ LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor =
+ new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx, mergeOp.getCursorStats());
search(opCtx, btreeCursor, rtreeSearchPred);
BTree btree = mergedComponent.getBuddyIndex();
@@ -567,14 +570,15 @@
ILSMIndexOperationContext rctx = createOpContext(NoOpOperationCallback.INSTANCE, -1);
rctx.setOperation(IndexOperation.MERGE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
- LSMRTreeSortedCursor cursor = new LSMRTreeSortedCursor(rctx, linearizer, buddyBTreeFields);
+ IIndexCursorStats stats = new IndexCursorStats();
+ LSMRTreeSortedCursor cursor = new LSMRTreeSortedCursor(rctx, linearizer, buddyBTreeFields, stats);
LSMComponentFileReferences relMergeFileRefs =
getMergeFileReferences((ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1),
(ILSMDiskComponent) mergingComponents.get(0));
ILSMIndexAccessor accessor = new LSMRTreeAccessor(getHarness(), rctx, buddyBTreeFields);
// create the merge operation.
LSMRTreeMergeOperation mergeOp =
- new LSMRTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(),
+ new LSMRTreeMergeOperation(accessor, cursor, stats, relMergeFileRefs.getInsertIndexFileReference(),
relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(),
ioOpCallback, fileManager.getBaseDir().getAbsolutePath());
ioOpCallback.scheduled(mergeOp);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 05182b7..92449e4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -64,7 +64,9 @@
import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.IndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.util.trace.ITracer;
@@ -337,7 +339,7 @@
}
componentBulkLoader = mergedComponent.createBulkLoader(mergeOp, 1.0f, false, numElements, false,
false, false, pageWriteCallbackFactory.createPageWriteCallback());
- mergeLoadBTree(opCtx, rtreeSearchPred, componentBulkLoader);
+ mergeLoadBTree(mergeOp, opCtx, rtreeSearchPred, componentBulkLoader);
} else {
//no buddy-btree needed
componentBulkLoader = mergedComponent.createBulkLoader(mergeOp, 1.0f, false, 0L, false, false,
@@ -361,9 +363,11 @@
return componentBulkLoader;
}
- private void mergeLoadBTree(ILSMIndexOperationContext opCtx, ISearchPredicate rtreeSearchPred,
- ILSMDiskComponentBulkLoader componentBulkLoader) throws HyracksDataException {
- LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
+ private void mergeLoadBTree(LSMRTreeMergeOperation mergeOp, ILSMIndexOperationContext opCtx,
+ ISearchPredicate rtreeSearchPred, ILSMDiskComponentBulkLoader componentBulkLoader)
+ throws HyracksDataException {
+ LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor =
+ new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx, mergeOp.getCursorStats());
try {
search(opCtx, btreeCursor, rtreeSearchPred);
try {
@@ -434,9 +438,10 @@
@Override
protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException {
- LSMRTreeSortedCursor cursor = new LSMRTreeSortedCursor(opCtx, linearizer, buddyBTreeFields);
+ IIndexCursorStats stats = new IndexCursorStats();
+ LSMRTreeSortedCursor cursor = new LSMRTreeSortedCursor(opCtx, linearizer, buddyBTreeFields, stats);
ILSMIndexAccessor accessor = new LSMRTreeAccessor(getHarness(), opCtx, buddyBTreeFields);
- return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(),
+ return new LSMRTreeMergeOperation(accessor, cursor, stats, mergeFileRefs.getInsertIndexFileReference(),
mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback,
getIndexIdentifier());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
index 176f767..c2644c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeAbstractCursor.java
@@ -23,26 +23,25 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
-import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
+import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
-import org.apache.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
-import org.apache.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
import org.apache.hyracks.storage.am.rtree.impls.RTree;
import org.apache.hyracks.storage.am.rtree.impls.RTree.RTreeAccessor;
import org.apache.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate;
import org.apache.hyracks.storage.common.EnforcedIndexCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
@@ -67,10 +66,12 @@
protected ISearchOperationCallback searchCallback;
protected List<ILSMComponent> operationalComponents;
protected long[] hashes = BloomFilter.createHashArray();
+ protected final IIndexAccessParameters iap;
- public LSMRTreeAbstractCursor(ILSMIndexOperationContext opCtx) {
+ public LSMRTreeAbstractCursor(ILSMIndexOperationContext opCtx, IIndexCursorStats stats) {
this.opCtx = opCtx;
btreeRangePredicate = new RangePredicate(null, null, true, true, null, null);
+ this.iap = IndexAccessParameters.createNoOpParams(stats);
}
public RTreeSearchCursor getCursor(int cursorIndex) {
@@ -108,43 +109,33 @@
if (component.getType() == LSMComponentType.MEMORY) {
includeMutableComponent = true;
// No need for a bloom filter for the in-memory BTree.
- if (btreeCursors[i] == null) {
- //create
- btreeCursors[i] = new BTreeRangeSearchCursor(
- (IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory().createFrame(), false);
- } else {
- //re-use
- btreeCursors[i].close();
- }
rtree = ((LSMRTreeMemoryComponent) component).getIndex();
btree = ((LSMRTreeMemoryComponent) component).getBuddyIndex();
bloomFilters[i] = null;
} else {
- if (btreeCursors[i] == null) {
- // need to create a new one
- btreeCursors[i] = new BTreeRangeSearchCursor(
- (IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory().createFrame(), false);
- } else {
- // close
- btreeCursors[i].close();
- }
rtree = ((LSMRTreeDiskComponent) component).getIndex();
btree = ((LSMRTreeDiskComponent) component).getBuddyIndex();
bloomFilters[i] = ((LSMRTreeDiskComponent) component).getBloomFilter();
}
+ if (rtreeAccessors[i] == null) {
+ rtreeAccessors[i] = rtree.createAccessor(iap);
+ // do not count the random I/Os incurred by btree lookups
+ btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ } else {
+ rtreeAccessors[i].reset(rtree, iap);
+ btreeAccessors[i].reset(btree, iap);
+ }
if (rtreeCursors[i] == null) {
- rtreeCursors[i] = new RTreeSearchCursor(
- (IRTreeInteriorFrame) lsmInitialState.getRTreeInteriorFrameFactory().createFrame(),
- (IRTreeLeafFrame) lsmInitialState.getRTreeLeafFrameFactory().createFrame());
+ rtreeCursors[i] = rtreeAccessors[i].createSearchCursor(false);
} else {
rtreeCursors[i].close();
}
- if (rtreeAccessors[i] == null) {
- rtreeAccessors[i] = rtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ if (btreeCursors[i] == null) {
+ // need to create a new one
+ btreeCursors[i] = btreeAccessors[i].createPointCursor(false);
} else {
- rtreeAccessors[i].reset(rtree, NoOpOperationCallback.INSTANCE);
- btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // close
+ btreeCursors[i].close();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
index b57b517..5b0c53d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
@@ -20,25 +20,23 @@
package org.apache.hyracks.storage.am.lsm.rtree.impls;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMRTreeDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
- public LSMRTreeDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx, true);
+ public LSMRTreeDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx, IIndexCursorStats stats) {
+ super(opCtx, true, stats);
}
@Override
@@ -60,11 +58,10 @@
IIndexAccessor[] btreeAccessors = new ITreeIndexAccessor[numBTrees];
for (int i = 0; i < numBTrees; i++) {
ILSMComponent component = operationalComponents.get(i);
- IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory().createFrame();
- rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
BTree btree = ((LSMRTreeDiskComponent) component).getBuddyIndex();
- btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ btreeAccessors[i] = btree.createAccessor(iap);
btreeAccessors[i].search(rangeCursors[i], btreePredicate);
+ rangeCursors[i] = btreeAccessors[i].createSearchCursor(false);
}
IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index aed5981..1246454 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -24,15 +24,16 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
public class LSMRTreeMergeOperation extends MergeOperation {
private final FileReference btreeMergeTarget;
private final FileReference bloomFilterMergeTarget;
- public LSMRTreeMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, FileReference target,
- FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback,
- String indexIdentifier) {
- super(accessor, target, callback, indexIdentifier, cursor);
+ public LSMRTreeMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, IIndexCursorStats stats,
+ FileReference target, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget,
+ ILSMIOOperationCallback callback, String indexIdentifier) {
+ super(accessor, target, callback, indexIdentifier, cursor, stats);
this.btreeMergeTarget = btreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
}
@@ -49,4 +50,5 @@
public LSMComponentFileReferences getComponentFiles() {
return new LSMComponentFileReferences(btreeMergeTarget, null, bloomFilterMergeTarget);
}
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index a8bc1dc..a568351 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
public class LSMRTreeSearchCursor extends LSMRTreeAbstractCursor {
@@ -34,7 +35,7 @@
private boolean resultOfsearchCallbackProceed = false;
public LSMRTreeSearchCursor(ILSMIndexOperationContext opCtx, int[] buddyBTreeFields) {
- super(opCtx);
+ super(opCtx, NoOpIndexCursorStats.INSTANCE);
currentCursor = 0;
this.btreeTuple = new PermutingTupleReference(buddyBTreeFields);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index 3c96875..ef3fb06 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.util.IndexCursorUtils;
@@ -40,8 +41,8 @@
private PermutingTupleReference btreeTuple;
public LSMRTreeSortedCursor(ILSMIndexOperationContext opCtx, ILinearizeComparatorFactory linearizer,
- int[] buddyBTreeFields) throws HyracksDataException {
- super(opCtx);
+ int[] buddyBTreeFields, IIndexCursorStats stats) throws HyracksDataException {
+ super(opCtx, stats);
this.linearizeCmp = linearizer.createBinaryComparator();
this.btreeTuple = new PermutingTupleReference(buddyBTreeFields);
close();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 20cd927..8e5cb35 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -63,7 +63,9 @@
import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.IndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
@@ -287,10 +289,11 @@
if (mergingComponents.get(mergingComponents.size() - 1) != diskComponents.get(diskComponents.size() - 1)) {
returnDeletedTuples = true;
}
+ IIndexCursorStats stats = new IndexCursorStats();
LSMRTreeWithAntiMatterTuplesSearchCursor cursor =
- new LSMRTreeWithAntiMatterTuplesSearchCursor(opCtx, returnDeletedTuples);
+ new LSMRTreeWithAntiMatterTuplesSearchCursor(opCtx, returnDeletedTuples, stats);
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getHarness(), opCtx, cursorFactory);
- return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), null, null,
- callback, getIndexIdentifier());
+ return new LSMRTreeMergeOperation(accessor, cursor, stats, mergeFileRefs.getInsertIndexFileReference(), null,
+ null, callback, getIndexIdentifier());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index 05dc3ef..7bfa7fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -21,7 +21,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
@@ -33,15 +32,15 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
-import org.apache.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
-import org.apache.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
import org.apache.hyracks.storage.am.rtree.impls.RTree;
import org.apache.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate;
import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCursor {
@@ -63,11 +62,12 @@
private boolean resultOfsearchCallBackProceed = false;
public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx) {
- this(opCtx, false);
+ this(opCtx, false, NoOpIndexCursorStats.INSTANCE);
}
- public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
- super(opCtx, returnDeletedTuples);
+ public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples,
+ IIndexCursorStats stats) {
+ super(opCtx, returnDeletedTuples, stats);
currentCursor = 0;
}
@@ -102,13 +102,10 @@
ILSMComponent component = operationalComponents.get(i);
RTree rtree = ((LSMRTreeMemoryComponent) component).getIndex();
BTree btree = ((LSMRTreeMemoryComponent) component).getBuddyIndex();
- mutableRTreeCursors[i] = new RTreeSearchCursor(
- (IRTreeInteriorFrame) lsmInitialState.getRTreeInteriorFrameFactory().createFrame(),
- (IRTreeLeafFrame) lsmInitialState.getRTreeLeafFrameFactory().createFrame());
- btreeCursors[i] = new BTreeRangeSearchCursor(
- (IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory().createFrame(), false);
btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
mutableRTreeAccessors[i] = rtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ btreeCursors[i] = (ITreeIndexCursor) btreeAccessors[i].createSearchCursor(false);
+ mutableRTreeCursors[i] = (RTreeSearchCursor) mutableRTreeAccessors[i].createSearchCursor(false);
}
rangeCursors = new RTreeSearchCursor[numImmutableComponents];
@@ -117,11 +114,10 @@
try {
for (int i = numMemoryComponents; i < operationalComponents.size(); i++) {
ILSMComponent component = operationalComponents.get(i);
- rangeCursors[j] = new RTreeSearchCursor(
- (IRTreeInteriorFrame) lsmInitialState.getRTreeInteriorFrameFactory().createFrame(),
- (IRTreeLeafFrame) lsmInitialState.getRTreeLeafFrameFactory().createFrame());
+
RTree rtree = ((LSMRTreeWithAntimatterDiskComponent) component).getIndex();
- immutableRTreeAccessors[j] = rtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ immutableRTreeAccessors[j] = rtree.createAccessor(iap);
+ rangeCursors[j] = immutableRTreeAccessors[j].createSearchCursor(false);
immutableRTreeAccessors[j].search(rangeCursors[j], searchPred);
j++;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index f31049a..7e8f249 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.api.IPageManager;
@@ -53,10 +54,11 @@
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
-import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -753,23 +755,24 @@
@Override
public RTreeAccessor createAccessor(IIndexAccessParameters iap) {
- return new RTreeAccessor(this, iap.getModificationCallback(), iap.getSearchOperationCallback());
+ return new RTreeAccessor(this, iap);
}
public class RTreeAccessor implements ITreeIndexAccessor {
private RTree rtree;
private RTreeOpContext ctx;
private boolean destroyed = false;
+ private IIndexAccessParameters iap;
- public RTreeAccessor(RTree rtree, IModificationOperationCallback modificationCallback,
- ISearchOperationCallback searchCallback) {
+ public RTreeAccessor(RTree rtree, IIndexAccessParameters iap) {
this.rtree = rtree;
- this.ctx = rtree.createOpContext(modificationCallback);
+ this.ctx = rtree.createOpContext(iap.getModificationCallback());
+ this.iap = iap;
}
- public void reset(RTree rtree, IModificationOperationCallback modificationCallback) {
+ public void reset(RTree rtree, IIndexAccessParameters iap) {
this.rtree = rtree;
- ctx.setModificationCallback(modificationCallback);
+ ctx.setModificationCallback(iap.getModificationCallback());
ctx.reset();
}
@@ -794,7 +797,8 @@
@Override
public RTreeSearchCursor createSearchCursor(boolean exclusive) {
return new RTreeSearchCursor((IRTreeInteriorFrame) interiorFrameFactory.createFrame(),
- (IRTreeLeafFrame) leafFrameFactory.createFrame());
+ (IRTreeLeafFrame) leafFrameFactory.createFrame(), (IIndexCursorStats) iap.getParameters()
+ .getOrDefault(HyracksConstants.INDEX_CURSOR_STATS, NoOpIndexCursorStats.INSTANCE));
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
index f85c044..80fc5bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
@@ -27,8 +27,10 @@
import org.apache.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
import org.apache.hyracks.storage.common.EnforcedIndexCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -56,10 +58,17 @@
private ITreeIndexTupleReference frameTuple;
private boolean readLatched = false;
+ private final IIndexCursorStats stats;
+
public RTreeSearchCursor(IRTreeInteriorFrame interiorFrame, IRTreeLeafFrame leafFrame) {
+ this(interiorFrame, leafFrame, NoOpIndexCursorStats.INSTANCE);
+ }
+
+ public RTreeSearchCursor(IRTreeInteriorFrame interiorFrame, IRTreeLeafFrame leafFrame, IIndexCursorStats stats) {
this.interiorFrame = interiorFrame;
this.leafFrame = leafFrame;
this.frameTuple = leafFrame.createTupleReference();
+ this.stats = stats;
}
@Override
@@ -108,6 +117,7 @@
}
ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
node.acquireReadLatch();
+ stats.getPageCounter().update(1);
readLatched = true;
try {
interiorFrame.setPage(node);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursorStats.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursorStats.java
new file mode 100644
index 0000000..d21c6c7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursorStats.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.common;
+
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+
+/**
+ * Basic operation stats for an index cursor
+ */
+public interface IIndexCursorStats {
+ ICounter getPageCounter();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IndexCursorStats.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IndexCursorStats.java
new file mode 100644
index 0000000..e389dc7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IndexCursorStats.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.common;
+
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+
+public class IndexCursorStats implements IIndexCursorStats {
+
+ private final ICounter pageCounter = new Counter("pageCounter");
+
+ @Override
+ public ICounter getPageCounter() {
+ return pageCounter;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/NoOpIndexCursorStats.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/NoOpIndexCursorStats.java
new file mode 100644
index 0000000..41ba84f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/NoOpIndexCursorStats.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.common;
+
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+
+public class NoOpIndexCursorStats implements IIndexCursorStats {
+
+ public static final IIndexCursorStats INSTANCE = new NoOpIndexCursorStats();
+
+ private static final ICounter NOOP_COUNTER = new ICounter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long update(long delta) {
+ return 0;
+ }
+
+ @Override
+ public long set(long value) {
+ return 0;
+ }
+
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public long get() {
+ return 0;
+ }
+ };
+
+ private NoOpIndexCursorStats() {
+ }
+
+ @Override
+ public ICounter getPageCounter() {
+ return NOOP_COUNTER;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TreeIndexTestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TreeIndexTestUtils.java
index 43d1504..0c7b5f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TreeIndexTestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TreeIndexTestUtils.java
@@ -41,12 +41,17 @@
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.buffercache.NoOpPageWriteCallback;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
@SuppressWarnings("rawtypes")
public abstract class TreeIndexTestUtils {
@@ -388,4 +393,15 @@
}
}
+ public static void checkCursorStats(ILSMIOOperation op) {
+ if (op.getIOOpertionType() == LSMIOOperationType.MERGE) {
+ MergeOperation mergeOp = (MergeOperation) op;
+ IIndexCursorStats stats = mergeOp.getCursorStats();
+ Assert.assertTrue(stats.getPageCounter().get() > 0);
+ // Index cursor stats are only an (conservative) approximation of the number of pages of
+ // merging components. Thus, there could be some left over pages.
+ Assert.assertTrue(stats.getPageCounter().get() >= 0);
+ }
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeFailTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeFailTest.java
index f3ae8a8..bd5d97d 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeFailTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeFailTest.java
@@ -127,6 +127,11 @@
}
}
+ @Override
+ public void completeOperation(ILSMIOOperation operation) throws HyracksDataException {
+ // No op
+ }
+
private void modifyOperation(ILSMIOOperation operation) throws Exception {
if (!(operation instanceof MergeOperation)) {
return;
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index 7c59671..8376065 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -27,8 +27,10 @@
import org.apache.hyracks.storage.am.btree.OrderedIndexTestDriver;
import org.apache.hyracks.storage.am.btree.OrderedIndexTestUtils;
import org.apache.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
+import org.apache.hyracks.storage.am.common.TreeIndexTestUtils;
import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
@SuppressWarnings("rawtypes")
@@ -74,7 +76,8 @@
}
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
- accessor.scheduleMerge(((LSMBTree) ctx.getIndex()).getDiskComponents());
+ ILSMIOOperation mergeOp = accessor.scheduleMerge(((LSMBTree) ctx.getIndex()).getDiskComponents());
+ mergeOp.addCompleteListener(op -> TreeIndexTestUtils.checkCursorStats(op));
orderedIndexTestUtils.checkPointSearches(ctx);
orderedIndexTestUtils.checkScan(ctx);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java
new file mode 100644
index 0000000..d03f7a5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/GreedySchedulerTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIoOperationFailedCallback;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class GreedySchedulerTest {
+
+ private static final String INDEX_1 = "index1";
+ private static final String INDEX_2 = "index2";
+
+ private final Object lock = new Object();
+
+ @Test
+ public void test() throws Exception {
+ GreedyScheduler scheduler = new GreedyScheduler(r -> new Thread(r), NoOpIoOperationFailedCallback.INSTANCE);
+ AtomicBoolean active1 = new AtomicBoolean(true);
+ ILSMIOOperation op1 = mockMergeOperation(INDEX_1, 10, active1);
+
+ scheduler.scheduleOperation(op1);
+ // op1 is activated
+ Assert.assertTrue(active1.get());
+
+ AtomicBoolean active2 = new AtomicBoolean(true);
+ ILSMIOOperation op2 = mockMergeOperation(INDEX_2, 5, active2);
+ scheduler.scheduleOperation(op2);
+ // op2 does not interactive with op1s
+ Assert.assertTrue(active1.get());
+ Assert.assertTrue(active2.get());
+
+ scheduler.completeOperation(op2);
+ Assert.assertTrue(active1.get());
+
+ AtomicBoolean active3 = new AtomicBoolean(true);
+ ILSMIOOperation op3 = mockMergeOperation(INDEX_1, 5, active3);
+ scheduler.scheduleOperation(op3);
+ Assert.assertTrue(active3.get());
+ Assert.assertFalse(active1.get());
+
+ AtomicBoolean active4 = new AtomicBoolean(true);
+ ILSMIOOperation op4 = mockMergeOperation(INDEX_1, 7, active4);
+ scheduler.scheduleOperation(op4);
+ // op3 is still active
+ Assert.assertFalse(active1.get());
+ Assert.assertTrue(active3.get());
+ Assert.assertFalse(active4.get());
+
+ // suppose op1 is completed (though unlikely in practice), now op3 is still active
+ scheduler.completeOperation(op1);
+ Assert.assertTrue(active3.get());
+ Assert.assertFalse(active4.get());
+
+ // op3 completed, op4 is active
+ scheduler.completeOperation(op3);
+ Assert.assertTrue(active4.get());
+
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ scheduler.close();
+ }
+
+ private ILSMIOOperation mockMergeOperation(String index, long remainingPages, AtomicBoolean isActive)
+ throws HyracksDataException {
+ ILSMIOOperation mergeOp = Mockito.mock(ILSMIOOperation.class);
+ Mockito.when(mergeOp.getIndexIdentifier()).thenReturn(index);
+ Mockito.when(mergeOp.getIOOpertionType()).thenReturn(LSMIOOperationType.MERGE);
+ Mockito.when(mergeOp.getRemainingPages()).thenReturn(remainingPages);
+
+ Mockito.doAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ return isActive.get();
+ }
+ }).when(mergeOp).isActive();
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ isActive.set(true);
+ return null;
+ }
+ }).when(mergeOp).resume();
+
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ isActive.set(false);
+ return null;
+ }
+ }).when(mergeOp).pause();
+
+ Mockito.doAnswer(new Answer<LSMIOOperationStatus>() {
+ @Override
+ public LSMIOOperationStatus answer(InvocationOnMock invocation) throws Throwable {
+ synchronized (lock) {
+ lock.wait();
+ }
+ return LSMIOOperationStatus.SUCCESS;
+ }
+ }).when(mergeOp).call();
+ return mergeOp;
+
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
index 1a882b8..e4a3b15 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
@@ -21,9 +21,11 @@
import java.io.IOException;
+import org.apache.hyracks.storage.am.common.TreeIndexTestUtils;
import org.apache.hyracks.storage.am.common.datagen.TupleGenerator;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
@@ -56,7 +58,9 @@
invIndex.activate();
}
// Perform merge.
- invIndexAccessor.scheduleMerge(((LSMInvertedIndex) invIndex).getDiskComponents());
+ ILSMIOOperation mergeOp = invIndexAccessor.scheduleMerge(((LSMInvertedIndex) invIndex).getDiskComponents());
+ mergeOp.addCompleteListener(op -> TreeIndexTestUtils.checkCursorStats(op));
+
validateAndCheckIndex(testCtx);
runTinySearchWorkload(testCtx, tupleGen);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index 18ee1f6..8dc9b07 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -106,6 +106,7 @@
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.buffercache.NoOpPageWriteCallback;
import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.util.IThreadStats;
@@ -299,7 +300,7 @@
LSMInvertedIndexAccessor invIndexAccessor =
(LSMInvertedIndexAccessor) invIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
compareActualAndExpectedIndexesRangeSearch(testCtx,
- new LSMInvertedIndexMergeCursor(invIndexAccessor.getOpContext()));
+ new LSMInvertedIndexMergeCursor(invIndexAccessor.getOpContext(), NoOpIndexCursorStats.INSTANCE));
}
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
index ee044bd..be37529 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
@@ -23,8 +23,10 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.storage.am.common.TreeIndexTestUtils;
import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree;
import org.apache.hyracks.storage.am.rtree.AbstractRTreeTestContext;
@@ -76,7 +78,8 @@
}
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
- accessor.scheduleMerge(((AbstractLSMRTree) ctx.getIndex()).getDiskComponents());
+ ILSMIOOperation mergeOp = accessor.scheduleMerge(((AbstractLSMRTree) ctx.getIndex()).getDiskComponents());
+ mergeOp.addCompleteListener(op -> TreeIndexTestUtils.checkCursorStats(op));
rTreeTestUtils.checkScan(ctx);
rTreeTestUtils.checkDiskOrderScan(ctx);