split flush and merge into two-part calls to allow for IO scheduling
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1736 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java
index 1965f3f..cb8443d 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java
@@ -60,10 +60,10 @@
*/
public IBinaryComparatorFactory[] getComparatorFactories();
- /**
- * @param fillFactor
- * @throws TreeIndexException if the user tries to instantiate a second bulk
- * loader
- */
- public IIndexBulkLoader createBulkLoader(float fillFactor) throws TreeIndexException;
+ /**
+ * @param fillFactor
+ * @throws TreeIndexException if the user tries to instantiate a second bulk
+ * loader
+ */
+ public IIndexBulkLoader createBulkLoader(float fillFactor) throws TreeIndexException;
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 4a3f7bf..c0fe5af 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -52,13 +52,17 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexComponentFinalizer;
@@ -263,14 +267,15 @@
}
@Override
- public ITreeIndex flush() throws HyracksDataException, IndexException {
+ public ITreeIndex flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+ LSMFlushOperation flushOp = (LSMFlushOperation) operation;
// Bulk load a new on-disk BTree from the in-memory BTree.
RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
ITreeIndexAccessor memBTreeAccessor = memBTree.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor();
memBTreeAccessor.search(scanCursor, nullPred);
- BTree diskBTree = createFlushTarget();
+ BTree diskBTree = createDiskBTree(diskBTreeFactory, flushOp.getFlushTarget(), true);
// Bulk load the tuples from the in-memory BTree into the new disk BTree.
IIndexBulkLoader bulkLoader = diskBTree.createBulkLoader(1.0f);
try {
@@ -302,23 +307,6 @@
return createDiskBTree(bulkLoadBTreeFactory, fileRef, true);
}
- private BTree createFlushTarget() throws HyracksDataException {
- String relFlushFileName = (String) fileManager.getRelFlushFileName();
- FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
- return createDiskBTree(diskBTreeFactory, fileRef, true);
- }
-
- private BTree createMergeTarget(List<Object> mergingDiskBTrees) throws HyracksDataException {
- BTree firstBTree = (BTree) mergingDiskBTrees.get(0);
- BTree lastBTree = (BTree) mergingDiskBTrees.get(mergingDiskBTrees.size() - 1);
- FileReference firstFile = diskFileMapProvider.lookupFileName(firstBTree.getFileId());
- FileReference lastFile = diskFileMapProvider.lookupFileName(lastBTree.getFileId());
- String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile
- .getFile().getName());
- FileReference fileRef = fileManager.createMergeFile(relMergeFileName);
- return createDiskBTree(diskBTreeFactory, fileRef, true);
- }
-
private BTree createDiskBTree(TreeFactory<BTree> factory, FileReference fileRef, boolean createBTree)
throws HyracksDataException {
// Create new BTree instance.
@@ -365,15 +353,12 @@
lsmTreeCursor.initPriorityQueue();
}
- public ITreeIndex merge(List<Object> mergedComponents) throws HyracksDataException, IndexException {
- LSMBTreeOpContext ctx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor();
- RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
- // Ordered scan, ignoring the in-memory BTree.
- // We get back a snapshot of the on-disk BTrees that are going to be
- // merged now, so we can clean them up after the merge has completed.
- List<Object> mergingDiskBTrees = lsmHarness.search(cursor, (RangePredicate) rangePred, ctx, false);
- mergedComponents.addAll(mergingDiskBTrees);
+ public ITreeIndex merge(List<Object> mergedComponents, ILSMIOOperation operation) throws HyracksDataException,
+ IndexException {
+ LSMMergeOperation mergeOp = (LSMMergeOperation) operation;
+ ITreeIndexCursor cursor = mergeOp.getCursor();
+
+ mergedComponents.addAll(mergeOp.getMergingComponents());
// Nothing to merge.
if (mergedComponents.size() <= 1) {
@@ -382,7 +367,7 @@
}
// Bulk load the tuples from all on-disk BTrees into the new BTree.
- BTree mergedBTree = createMergeTarget(mergingDiskBTrees);
+ BTree mergedBTree = createDiskBTree(diskBTreeFactory, mergeOp.getMergeTarget(), true);
IIndexBulkLoader bulkLoader = mergedBTree.createBulkLoader(1.0f);
try {
while (cursor.hasNext()) {
@@ -518,6 +503,41 @@
LSMBTreeOpContext concreteCtx = (LSMBTreeOpContext) ctx;
return concreteCtx.cmp;
}
+
+ @Override
+ public ILSMIOOperation createFlushOperation(ILSMIOOperationCallback callback) {
+ String relFlushFileName = (String) fileManager.getRelFlushFileName();
+ FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
+ return new LSMFlushOperation(lsmHarness.getIndex(), fileRef, callback);
+ }
+ }
+
+ public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
+ LSMBTreeOpContext ctx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor();
+ RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
+ // Ordered scan, ignoring the in-memory BTree.
+ // We get back a snapshot of the on-disk BTrees that are going to be
+ // merged now, so we can clean them up after the merge has completed.
+ List<Object> mergingDiskBTrees;
+ try {
+ mergingDiskBTrees = lsmHarness.search(cursor, (RangePredicate) rangePred, ctx, false);
+ if (mergingDiskBTrees.size() <= 1) {
+ cursor.close();
+ return null;
+ }
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+
+ BTree firstBTree = (BTree) mergingDiskBTrees.get(0);
+ BTree lastBTree = (BTree) mergingDiskBTrees.get(mergingDiskBTrees.size() - 1);
+ FileReference firstFile = diskFileMapProvider.lookupFileName(firstBTree.getFileId());
+ FileReference lastFile = diskFileMapProvider.lookupFileName(lastBTree.getFileId());
+ String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile
+ .getFile().getName());
+ FileReference fileRef = fileManager.createMergeFile(relMergeFileName);
+ return new LSMMergeOperation(lsmHarness.getIndex(), mergingDiskBTrees, cursor, fileRef, callback);
}
@Override
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 95dea2f..a90b64e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -43,13 +43,16 @@
public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException;
- public Object merge(List<Object> mergedComponents) throws HyracksDataException, IndexException;
+ public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException;
+
+ public Object merge(List<Object> mergedComponents, ILSMIOOperation operation) throws HyracksDataException,
+ IndexException;
public void addMergedComponent(Object newComponent, List<Object> mergedComponents);
public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException;
- public Object flush() throws HyracksDataException, IndexException;
+ public Object flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
public void addFlushedComponent(Object index);
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 6c10d26..b768cfd 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeInProgressException;
/**
* Client handle for performing operations
@@ -29,13 +30,18 @@
* concurrent operations).
*/
public interface ILSMIndexAccessor extends IIndexAccessor {
+ public ILSMIOOperation createFlushOperation(ILSMIOOperationCallback callback);
+
+ public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
+ LSMMergeInProgressException;
+
/**
* Force a flush of the in-memory component.
*
* @throws HyracksDataException
* @throws TreeIndexException
*/
- public void flush() throws HyracksDataException, IndexException;
+ public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
/**
* Merge all on-disk components.
@@ -43,7 +49,7 @@
* @throws HyracksDataException
* @throws TreeIndexException
*/
- public void merge() throws HyracksDataException, IndexException;
+ public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException;
/**
* Deletes the tuple from the memory component only.
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index ae940e4..2a1f0e7 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -1,6 +1,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
public interface ILSMMergePolicy {
- public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents);
+ public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents) throws HyracksDataException;
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
new file mode 100644
index 0000000..0caef1a
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+
+public class BlockingIOOperationCallback implements ILSMIOOperationCallback {
+
+ @Override
+ public void callback() {
+ this.notifyAll();
+ }
+
+ public void block() throws InterruptedException {
+ this.wait();
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index 1f63cd3..8140b0e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -1,7 +1,11 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
public class ConstantMergePolicy implements ILSMMergePolicy {
@@ -16,9 +20,19 @@
}
@Override
- public void diskComponentAdded(final ILSMIndex index, int totalNumDiskComponents) {
+ public void diskComponentAdded(final ILSMIndex index, int totalNumDiskComponents) throws HyracksDataException {
if (totalNumDiskComponents >= threshold) {
- ioScheduler.scheduleOperation(new LSMMergeOperation(index));
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ ILSMIOOperation op;
+ try {
+ op = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+ if (op != null) {
+ ioScheduler.scheduleOperation(op);
+ }
+ } catch (LSMMergeInProgressException e) {
+ // Do nothing
+ }
}
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
index 1368899..0c70bc1 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMFlushOperation.java
@@ -4,6 +4,7 @@
import java.util.List;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -15,9 +16,13 @@
public class LSMFlushOperation implements ILSMIOOperation {
private final ILSMIndex index;
+ private final FileReference flushTarget;
+ private final ILSMIOOperationCallback callback;
- public LSMFlushOperation(ILSMIndex index) {
+ public LSMFlushOperation(ILSMIndex index, FileReference flushTarget, ILSMIOOperationCallback callback) {
this.index = index;
+ this.flushTarget = flushTarget;
+ this.callback = callback;
}
@Override
@@ -27,20 +32,23 @@
@Override
public List<IODeviceHandle> getWriteDevices() {
- // TODO Auto-generated method stub
- return null;
+ return Collections.singletonList(flushTarget.getDevideHandle());
}
@Override
public void perform() throws HyracksDataException, IndexException {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- accessor.flush();
+ accessor.flush(this);
}
@Override
public ILSMIOOperationCallback getCallback() {
- return NoOpIOOperationCallback.INSTANCE;
+ return callback;
+ }
+
+ public FileReference getFlushTarget() {
+ return flushTarget;
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 2fe1093..44f305e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -29,6 +29,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -101,11 +103,11 @@
}
}
- public void flush() throws HyracksDataException, IndexException {
+ public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Flushing LSM-Tree.");
}
- Object newComponent = lsmIndex.flush();
+ Object newComponent = lsmIndex.flush(operation);
// The implementation of this call must take any necessary steps to make
// the new component permanent, and mark it as valid (usually this means
@@ -150,12 +152,21 @@
return diskComponentSnapshot;
}
- public void merge() throws HyracksDataException, IndexException {
+ public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws LSMMergeInProgressException,
+ HyracksDataException {
if (!isMerging.compareAndSet(false, true)) {
throw new LSMMergeInProgressException(
"Merge already in progress. Only one merge process allowed at a time.");
}
+ ILSMIOOperation mergeOp = lsmIndex.createMergeOperation(callback);
+ if (mergeOp == null) {
+ isMerging.set(false);
+ }
+ return mergeOp;
+ }
+
+ public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Merging LSM-Tree.");
}
@@ -165,7 +176,7 @@
AtomicInteger localSearcherRefCount = searcherRefCount;
List<Object> mergedComponents = new ArrayList<Object>();
- Object newComponent = lsmIndex.merge(mergedComponents);
+ Object newComponent = lsmIndex.merge(mergedComponents, operation);
// No merge happened.
if (newComponent == null) {
@@ -246,4 +257,8 @@
public ILSMIOOperationScheduler getIOScheduler() {
return ioScheduler;
}
+
+ public ILSMIndex getIndex() {
+ return lsmIndex;
+ }
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMMergeOperation.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMMergeOperation.java
index 2f82b7a..a38f82b 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMMergeOperation.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMMergeOperation.java
@@ -1,10 +1,15 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.AbstractTreeIndex;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -14,33 +19,56 @@
public class LSMMergeOperation implements ILSMIOOperation {
private final ILSMIndex index;
+ private final List<Object> mergingComponents;
+ private final ITreeIndexCursor cursor;
+ private final FileReference mergeTarget;
+ private final ILSMIOOperationCallback callback;
- public LSMMergeOperation(ILSMIndex index) {
+ public LSMMergeOperation(ILSMIndex index, List<Object> mergingComponents, ITreeIndexCursor cursor,
+ FileReference mergeTarget, ILSMIOOperationCallback callback) {
this.index = index;
+ this.mergingComponents = mergingComponents;
+ this.cursor = cursor;
+ this.mergeTarget = mergeTarget;
+ this.callback = callback;
}
@Override
public List<IODeviceHandle> getReadDevices() {
- // TODO Auto-generated method stub
- return null;
+ List<IODeviceHandle> devs = new ArrayList<IODeviceHandle>();
+ for (Object o : mergingComponents) {
+ AbstractTreeIndex idx = (AbstractTreeIndex) o;
+ devs.add(idx.getFileReference().getDevideHandle());
+ }
+ return devs;
}
@Override
public List<IODeviceHandle> getWriteDevices() {
- // TODO Auto-generated method stub
- return null;
+ return Collections.singletonList(mergeTarget.getDevideHandle());
}
@Override
public void perform() throws HyracksDataException, IndexException {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- accessor.merge();
+ accessor.merge(this);
}
@Override
public ILSMIOOperationCallback getCallback() {
- return NoOpIOOperationCallback.INSTANCE;
+ return callback;
}
+ public FileReference getMergeTarget() {
+ return mergeTarget;
+ }
+
+ public ITreeIndexCursor getCursor() {
+ return cursor;
+ }
+
+ public List<Object> getMergingComponents() {
+ return mergingComponents;
+ }
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 1630d2c..8676b27 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -22,6 +22,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
public abstract class LSMTreeIndexAccessor implements ILSMIndexAccessor {
@@ -51,7 +53,7 @@
ctx.reset(IndexOp.DELETE);
lsmHarness.insertUpdateOrDelete(tuple, ctx);
}
-
+
@Override
public void upsert(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.reset(IndexOp.UPSERT);
@@ -65,18 +67,24 @@
}
@Override
- public void flush() throws HyracksDataException, IndexException {
- lsmHarness.flush();
+ public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+ lsmHarness.flush(operation);
}
@Override
- public void merge() throws HyracksDataException, IndexException {
- lsmHarness.merge();
+ public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+ lsmHarness.merge(operation);
}
-
+
@Override
public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.reset(IndexOp.PHYSICALDELETE);
lsmHarness.insertUpdateOrDelete(tuple, ctx);
}
+
+ @Override
+ public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
+ LSMMergeInProgressException {
+ return lsmHarness.createMergeOperation(callback);
+ }
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
index 12dc42d..cd1d394 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/RefCountingOperationTracker.java
@@ -1,6 +1,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
public class RefCountingOperationTracker implements ILSMOperationTracker {
@@ -34,7 +36,10 @@
// Flush will only be handled by last exiting thread.
if (index.getFlushController().getFlushStatus(index) && threadRefCount == 0) {
- index.getIOScheduler().scheduleOperation(new LSMFlushOperation(index));
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ index.getIOScheduler().scheduleOperation(
+ accessor.createFlushOperation(NoOpIOOperationCallback.INSTANCE));
}
}
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index d6ba98f..59e74ba 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -43,6 +43,8 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -183,7 +185,8 @@
}
@Override
- public Object flush() throws HyracksDataException, IndexException {
+ public Object flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+ LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation;
// Renaming order is critical because we use assume ordering when we
// read the file names when we open the tree.
// The RTree should be renamed before the BTree.
@@ -194,9 +197,7 @@
RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor();
SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
- LSMRTreeFileNameComponent fileNames = (LSMRTreeFileNameComponent) fileManager.getRelFlushFileName();
- FileReference rtreeFile = fileManager.createFlushFile(fileNames.getRTreeFileName());
- RTree diskRTree = (RTree) createDiskTree(diskRTreeFactory, rtreeFile, true);
+ RTree diskRTree = (RTree) createDiskTree(diskRTreeFactory, flushOp.getRTreeFlushTarget(), true);
IIndexBulkLoader rTreeBulkloader;
ITreeIndexCursor cursor;
@@ -247,8 +248,7 @@
IIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor();
RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
- FileReference btreeFile = fileManager.createFlushFile(fileNames.getBTreeFileName());
- BTree diskBTree = (BTree) createDiskTree(diskBTreeFactory, btreeFile, true);
+ BTree diskBTree = (BTree) createDiskTree(diskBTreeFactory, flushOp.getBTreeFlushTarget(), true);
// BulkLoad the tuples from the in-memory tree into the new disk BTree.
IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f);
@@ -266,18 +266,11 @@
}
@Override
- public Object merge(List<Object> mergedComponents) throws HyracksDataException, IndexException {
- // Renaming order is critical because we use assume ordering when we
- // read the file names when we open the tree.
- // The RTree should be renamed before the BTree.
-
- IIndexOpContext ctx = createOpContext();
- ITreeIndexCursor cursor;
- cursor = new LSMRTreeSortedCursor(linearizer);
- ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
- // Scan the RTrees, ignoring the in-memory RTree.
- List<Object> mergingComponents = lsmHarness.search(cursor, rtreeSearchPred, ctx, false);
- mergedComponents.addAll(mergingComponents);
+ public Object merge(List<Object> mergedComponents, ILSMIOOperation operation) throws HyracksDataException,
+ IndexException {
+ LSMRTreeMergeOperation mergeOp = (LSMRTreeMergeOperation) operation;
+ ITreeIndexCursor cursor = mergeOp.getCursor();
+ mergedComponents.addAll(mergeOp.getMergingComponents());
// Nothing to merge.
if (mergedComponents.size() <= 1) {
@@ -286,11 +279,8 @@
}
// Bulk load the tuples from all on-disk RTrees into the new RTree.
- LSMRTreeFileNameComponent fileNames = getMergeTargetFileName(mergingComponents);
- FileReference rtreeFile = fileManager.createMergeFile(fileNames.getRTreeFileName());
- FileReference btreeFile = fileManager.createMergeFile(fileNames.getBTreeFileName());
- RTree mergedRTree = (RTree) createDiskTree(diskRTreeFactory, rtreeFile, true);
- BTree mergedBTree = (BTree) createDiskTree(diskBTreeFactory, btreeFile, true);
+ RTree mergedRTree = (RTree) createDiskTree(diskRTreeFactory, mergeOp.getRTreeMergeTarget(), true);
+ BTree mergedBTree = (BTree) createDiskTree(diskBTreeFactory, mergeOp.getBTreeMergeTarget(), true);
IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f);
try {
@@ -326,6 +316,35 @@
}
@Override
+ public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
+ // Renaming order is critical because we use assume ordering when we
+ // read the file names when we open the tree.
+ // The RTree should be renamed before the BTree.
+ IIndexOpContext ctx = createOpContext();
+ ITreeIndexCursor cursor;
+ cursor = new LSMRTreeSortedCursor(linearizer);
+ ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+ // Scan the RTrees, ignoring the in-memory RTree.
+ List<Object> mergingComponents;
+ try {
+ mergingComponents = lsmHarness.search(cursor, rtreeSearchPred, ctx, false);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ // Nothing to merge.
+ if (mergingComponents.size() <= 1) {
+ cursor.close();
+ return null;
+ }
+ LSMRTreeFileNameComponent fileNames = getMergeTargetFileName(mergingComponents);
+ FileReference rtreeFile = fileManager.createMergeFile(fileNames.getRTreeFileName());
+ FileReference btreeFile = fileManager.createMergeFile(fileNames.getBTreeFileName());
+
+ return new LSMRTreeMergeOperation(lsmHarness.getIndex(), mergingComponents, cursor, rtreeFile, btreeFile,
+ callback);
+ }
+
+ @Override
public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback) {
return new LSMRTreeAccessor(lsmHarness, createOpContext());
@@ -345,6 +364,14 @@
LSMRTreeOpContext concreteCtx = (LSMRTreeOpContext) ctx;
return concreteCtx.rtreeOpContext.cmp;
}
+
+ @Override
+ public ILSMIOOperation createFlushOperation(ILSMIOOperationCallback callback) {
+ LSMRTreeFileNameComponent fileNames = (LSMRTreeFileNameComponent) fileManager.getRelFlushFileName();
+ FileReference rtreeFile = fileManager.createFlushFile(fileNames.getRTreeFileName());
+ FileReference btreeFile = fileManager.createFlushFile(fileNames.getBTreeFileName());
+ return new LSMRTreeFlushOperation(lsmHarness.getIndex(), rtreeFile, btreeFile, callback);
+ }
}
@Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
new file mode 100644
index 0000000..5ec4776
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -0,0 +1,60 @@
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import java.util.Collections;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public class LSMRTreeFlushOperation implements ILSMIOOperation {
+
+ private final ILSMIndex index;
+ private final FileReference rtreeFlushTarget;
+ private final FileReference btreeFlushTarget;
+ private final ILSMIOOperationCallback callback;
+
+ public LSMRTreeFlushOperation(ILSMIndex index, FileReference rtreeFlushTarget, FileReference btreeFlushTarget,
+ ILSMIOOperationCallback callback) {
+ this.index = index;
+ this.rtreeFlushTarget = rtreeFlushTarget;
+ this.btreeFlushTarget = btreeFlushTarget;
+ this.callback = callback;
+ }
+
+ @Override
+ public List<IODeviceHandle> getReadDevices() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<IODeviceHandle> getWriteDevices() {
+ return Collections.singletonList(rtreeFlushTarget.getDevideHandle());
+ }
+
+ @Override
+ public void perform() throws HyracksDataException, IndexException {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.flush(this);
+ }
+
+ @Override
+ public ILSMIOOperationCallback getCallback() {
+ return callback;
+ }
+
+ public FileReference getRTreeFlushTarget() {
+ return rtreeFlushTarget;
+ }
+
+ public FileReference getBTreeFlushTarget() {
+ return btreeFlushTarget;
+ }
+}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
new file mode 100644
index 0000000..e55f2c1
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -0,0 +1,80 @@
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree.LSMRTreeComponent;
+
+public class LSMRTreeMergeOperation implements ILSMIOOperation {
+ private final ILSMIndex index;
+ private final List<Object> mergingComponents;
+ private final ITreeIndexCursor cursor;
+ private final FileReference rtreeMergeTarget;
+ private final FileReference btreeMergeTarget;
+ private final ILSMIOOperationCallback callback;
+
+ public LSMRTreeMergeOperation(ILSMIndex index, List<Object> mergingComponents, ITreeIndexCursor cursor,
+ FileReference rtreeMergeTarget, FileReference btreeMergeTarget, ILSMIOOperationCallback callback) {
+ this.index = index;
+ this.mergingComponents = mergingComponents;
+ this.cursor = cursor;
+ this.rtreeMergeTarget = rtreeMergeTarget;
+ this.btreeMergeTarget = btreeMergeTarget;
+ this.callback = callback;
+ }
+
+ @Override
+ public List<IODeviceHandle> getReadDevices() {
+ List<IODeviceHandle> devs = new ArrayList<IODeviceHandle>();
+ for (Object o : mergingComponents) {
+ LSMRTreeComponent component = (LSMRTreeComponent) o;
+ devs.add(component.getRTree().getFileReference().getDevideHandle());
+ }
+ return devs;
+ }
+
+ @Override
+ public List<IODeviceHandle> getWriteDevices() {
+ return Collections.singletonList(rtreeMergeTarget.getDevideHandle());
+ }
+
+ @Override
+ public void perform() throws HyracksDataException, IndexException {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.merge(this);
+ }
+
+ @Override
+ public ILSMIOOperationCallback getCallback() {
+ return callback;
+ }
+
+ public FileReference getRTreeMergeTarget() {
+ return rtreeMergeTarget;
+ }
+
+ public FileReference getBTreeMergeTarget() {
+ return btreeMergeTarget;
+ }
+
+ public ITreeIndexCursor getCursor() {
+ return cursor;
+ }
+
+ public List<Object> getMergingComponents() {
+ return mergingComponents;
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 9e1bdb3..2f94339 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -44,11 +44,15 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMFlushOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexComponentFinalizer;
@@ -134,23 +138,6 @@
super.clear();
}
- private RTree createFlushTarget() throws HyracksDataException {
- String relFlushFileName = (String) fileManager.getRelFlushFileName();
- FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
- return (RTree) createDiskTree(diskRTreeFactory, fileRef, true);
- }
-
- private RTree createMergeTarget(List<Object> mergingDiskRTrees) throws HyracksDataException {
- RTree firstRTree = (RTree) mergingDiskRTrees.get(0);
- RTree lastRTree = (RTree) mergingDiskRTrees.get(mergingDiskRTrees.size() - 1);
- FileReference firstFile = diskFileMapProvider.lookupFileName(firstRTree.getFileId());
- FileReference lastFile = diskFileMapProvider.lookupFileName(lastRTree.getFileId());
- String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile
- .getFile().getName());
- FileReference fileRef = fileManager.createMergeFile(relMergeFileName);
- return (RTree) createDiskTree(diskRTreeFactory, fileRef, true);
- }
-
public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
@@ -191,7 +178,8 @@
}
@Override
- public ITreeIndex flush() throws HyracksDataException, IndexException {
+ public ITreeIndex flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+ LSMFlushOperation flushOp = (LSMFlushOperation) operation;
// Renaming order is critical because we use assume ordering when we
// read the file names when we open the tree.
// The RTree should be renamed before the BTree.
@@ -202,7 +190,7 @@
RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor();
SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
- RTree diskRTree = createFlushTarget();
+ RTree diskRTree = (RTree) createDiskTree(diskRTreeFactory, flushOp.getFlushTarget(), true);
// scan the memory BTree
ITreeIndexAccessor memBTreeAccessor = memComponent.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
@@ -282,15 +270,11 @@
}
@Override
- public ITreeIndex merge(List<Object> mergedComponents) throws HyracksDataException, IndexException {
- LSMRTreeOpContext ctx = createOpContext();
- ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor();
- ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
- // Ordered scan, ignoring the in-memory RTree.
- // We get back a snapshot of the on-disk RTrees that are going to be
- // merged now, so we can clean them up after the merge has completed.
- List<Object> mergingDiskRTrees = lsmHarness.search(cursor, (SearchPredicate) rtreeSearchPred, ctx, false);
- mergedComponents.addAll(mergingDiskRTrees);
+ public ITreeIndex merge(List<Object> mergedComponents, ILSMIOOperation operation) throws HyracksDataException,
+ IndexException {
+ LSMMergeOperation mergeOp = (LSMMergeOperation) operation;
+ ITreeIndexCursor cursor = mergeOp.getCursor();
+ mergedComponents.addAll(mergeOp.getMergingComponents());
// Nothing to merge.
if (mergedComponents.size() <= 1) {
@@ -299,7 +283,7 @@
}
// Bulk load the tuples from all on-disk RTrees into the new RTree.
- RTree mergedRTree = createMergeTarget(mergingDiskRTrees);
+ RTree mergedRTree = (RTree) createDiskTree(diskRTreeFactory, mergeOp.getMergeTarget(), true);
IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f);
try {
while (cursor.hasNext()) {
@@ -344,6 +328,13 @@
LSMRTreeOpContext concreteCtx = (LSMRTreeOpContext) ctx;
return concreteCtx.rtreeOpContext.cmp;
}
+
+ @Override
+ public ILSMIOOperation createFlushOperation(ILSMIOOperationCallback callback) {
+ String relFlushFileName = (String) fileManager.getRelFlushFileName();
+ FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
+ return new LSMFlushOperation(lsmHarness.getIndex(), fileRef, callback);
+ }
}
@Override
@@ -384,4 +375,33 @@
}
}
+
+ @Override
+ public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
+ LSMRTreeOpContext ctx = createOpContext();
+ ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor();
+ ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+ // Ordered scan, ignoring the in-memory RTree.
+ // We get back a snapshot of the on-disk RTrees that are going to be
+ // merged now, so we can clean them up after the merge has completed.
+ List<Object> mergingDiskRTrees;
+ try {
+ mergingDiskRTrees = lsmHarness.search(cursor, (SearchPredicate) rtreeSearchPred, ctx, false);
+ if (mergingDiskRTrees.size() <= 1) {
+ cursor.close();
+ return null;
+ }
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+
+ RTree firstRTree = (RTree) mergingDiskRTrees.get(0);
+ RTree lastRTree = (RTree) mergingDiskRTrees.get(mergingDiskRTrees.size() - 1);
+ FileReference firstFile = diskFileMapProvider.lookupFileName(firstRTree.getFileId());
+ FileReference lastFile = diskFileMapProvider.lookupFileName(lastRTree.getFileId());
+ String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile
+ .getFile().getName());
+ FileReference fileRef = fileManager.createMergeFile(relMergeFileName);
+ return new LSMMergeOperation(lsmHarness.getIndex(), mergingDiskRTrees, cursor, fileRef, callback);
+ }
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index 4d471b8..3add02e 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -24,7 +24,9 @@
import edu.uci.ics.hyracks.storage.am.btree.OrderedIndexTestUtils;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
@SuppressWarnings("rawtypes")
public abstract class LSMBTreeMergeTestDriver extends OrderedIndexTestDriver {
@@ -63,7 +65,10 @@
}
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
- accessor.merge();
+ ILSMIOOperation ioop = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+ if (ioop != null) {
+ accessor.merge(ioop);
+ }
orderedIndexTestUtils.checkPointSearches(ctx);
orderedIndexTestUtils.checkScan(ctx);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 0cbfb64..377b959 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -33,29 +33,31 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree.LSMBTreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeInProgressException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
public class LSMBTreeTestWorker extends AbstractTreeIndexTestWorker {
-
+
private final LSMBTree lsmBTree;
private final int numKeyFields;
private final ArrayTupleBuilder deleteTb;
private final ArrayTupleReference deleteTuple = new ArrayTupleReference();
-
+
public LSMBTreeTestWorker(DataGenThread dataGen, TestOperationSelector opSelector, ITreeIndex index, int numBatches) {
super(dataGen, opSelector, index, numBatches);
lsmBTree = (LSMBTree) index;
numKeyFields = lsmBTree.getComparatorFactories().length;
deleteTb = new ArrayTupleBuilder(numKeyFields);
}
-
+
@Override
- public void performOp(ITupleReference tuple, TestOperation op) throws HyracksDataException, IndexException {
+ public void performOp(ITupleReference tuple, TestOperation op) throws HyracksDataException, IndexException {
LSMBTreeIndexAccessor accessor = (LSMBTreeIndexAccessor) indexAccessor;
IIndexCursor searchCursor = accessor.createSearchCursor();
MultiComparator cmp = accessor.getMultiComparator();
RangePredicate rangePred = new RangePredicate(tuple, tuple, true, true, cmp, cmp);
-
+
switch (op) {
case INSERT:
try {
@@ -64,7 +66,7 @@
// Ignore duplicate keys, since we get random tuples.
}
break;
-
+
case DELETE:
// Create a tuple reference with only key fields.
deleteTb.reset();
@@ -78,8 +80,8 @@
// Ignore non-existant keys, since we get random tuples.
}
break;
-
- case UPDATE:
+
+ case UPDATE:
try {
accessor.update(tuple);
} catch (BTreeNonExistentKeyException e) {
@@ -88,15 +90,15 @@
// Ignore not updateable exception due to numKeys == numFields.
}
break;
-
- case POINT_SEARCH:
+
+ case POINT_SEARCH:
searchCursor.reset();
rangePred.setLowKey(tuple, true);
rangePred.setHighKey(tuple, true);
accessor.search(searchCursor, rangePred);
consumeCursorTuples(searchCursor);
break;
-
+
case SCAN:
searchCursor.reset();
rangePred.setLowKey(null, true);
@@ -104,10 +106,13 @@
accessor.search(searchCursor, rangePred);
consumeCursorTuples(searchCursor);
break;
-
+
case MERGE:
try {
- accessor.merge();
+ ILSMIOOperation ioop = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+ if (ioop != null) {
+ accessor.merge(ioop);
+ }
} catch (LSMMergeInProgressException e) {
// Ignore ongoing merges. Do an insert instead.
try {
@@ -117,7 +122,7 @@
}
}
break;
-
+
default:
throw new HyracksDataException("Op " + op.toString() + " not supported.");
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
index da66966..36c3c5c 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
@@ -21,7 +21,9 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestDriver;
import edu.uci.ics.hyracks.storage.am.rtree.RTreeTestUtils;
@@ -33,7 +35,7 @@
private final RTreeTestUtils rTreeTestUtils;
public LSMRTreeMergeTestDriver(boolean testRstarPolicy) {
- super(testRstarPolicy);
+ super(testRstarPolicy);
this.rTreeTestUtils = new RTreeTestUtils();
}
@@ -65,7 +67,10 @@
}
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
- accessor.merge();
+ ILSMIOOperation mergeOperation = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+ if (mergeOperation != null) {
+ accessor.merge(mergeOperation);
+ }
rTreeTestUtils.checkScan(ctx);
rTreeTestUtils.checkDiskOrderScan(ctx);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
index bcbcf71..59a7728 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
@@ -27,7 +27,9 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeInProgressException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree.LSMRTreeAccessor;
import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
@@ -73,7 +75,10 @@
case MERGE:
try {
- accessor.merge();
+ ILSMIOOperation mergeOperation = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+ if (mergeOperation != null) {
+ accessor.merge(mergeOperation);
+ }
} catch (LSMMergeInProgressException e) {
// Ignore ongoing merges. Do an insert instead.
rearrangeTuple(tuple, cmp);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
index 8c5613a..0f8cdec 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
@@ -24,7 +24,9 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMMergeInProgressException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuples.LSMRTreeWithAntiMatterTuplesAccessor;
import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
@@ -62,7 +64,10 @@
case MERGE:
try {
- accessor.merge();
+ ILSMIOOperation mergeOperation = accessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+ if (mergeOperation != null) {
+ accessor.merge(mergeOperation);
+ }
} catch (LSMMergeInProgressException e) {
// Ignore ongoing merges. Do an insert instead.
rearrangeTuple(tuple, cmp);