[ASTERIXDB-2103][STO] Too many disk components for CorrelatedPolicy
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Currently CorrelatedMergePolicy uses component Ids to ensure disk
components of primary and secondary indexes are merged together,
but without synchronization. However, this results in too many disk
components for secondary InvertedIndex. The reason is that secondary
index could miss some round of merges, if the merge policy finds out
the corresponding secondary components are not available (either being
merged or being flushed). Even though flow-control on secondary indexes
can guarantee the secondary index would catch up the next time, it is
still possible that the primary component is finialized, which leaves
the secondary components which miss this round of merge are never merged
again.
This patch fixes this bug by:
- Add the mechanism of depending operations to LSM IO operation. An
operation finishes only after all depending operations have finished.
- For correlated merge policy, the flush/merge of the primary index depends
on all flushes/merges of secondary indexes. This ensures when the
correlated policy schedules merge, all related components of all indexes
are available to merge.
Change-Id: Ib6c06ee23f3bfd16b758802388389c00e29780b1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2018
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Jianfeng Jia <jianfeng.jia@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 76bec8c..5e0e072 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -132,9 +132,9 @@
public void createIndex() throws Exception {
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
- NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
- partitioningKeys, null, null, null, false, null, false),
+ dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null,
+ null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, partitioningKeys, null, null, null,
+ false, null, false),
null, DatasetType.INTERNAL, DATASET_ID, 0);
PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
@@ -448,7 +448,7 @@
for (int i = 0; i < numMergedComponents; i++) {
mergedComponents.add(diskComponents.get(i));
}
- mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+ mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null);
merger.waitUntilCount(1);
// now that we enetered, we will rollback
Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
@@ -635,7 +635,7 @@
for (int i = 0; i < numMergedComponents; i++) {
mergedComponents.add(diskComponents.get(i));
}
- mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+ mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null);
merger.waitUntilCount(1);
// we will block search
lsmBtree.clearSearchCallbacks();
@@ -707,7 +707,7 @@
for (int i = 0; i < numMergedComponents; i++) {
mergedComponents.add(diskComponents.get(i));
}
- mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+ mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null);
merger.waitUntilCount(1);
// we will block search
lsmBtree.clearSearchCallbacks();
@@ -736,7 +736,7 @@
}
private class Rollerback {
- private Thread task;
+ private final Thread task;
private Exception failure;
public Rollerback(TestLsmBtree lsmBtree, Predicate<ILSMComponent> predicate) {
@@ -766,7 +766,7 @@
}
private class Searcher {
- private ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
private Future<Boolean> task;
private volatile boolean entered = false;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index a20e660..e18181c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy;
@@ -91,14 +92,17 @@
Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
int partition = getIndexPartition(index, indexInfos);
- triggerScheduledMerge(minID, maxID,
+ List<ILSMIOOperation> dependingMerges = scheduleSecondaryIndexes(minID, maxID,
indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet()));
+
+ schedulePrimaryIndex(minID, maxID, index, dependingMerges);
+
return true;
}
/**
* Submit merge requests for all disk components within [minID, maxID]
- * of all indexes of a given dataset in the given partition
+ * of all of secondary indexes of a given dataset in the given partition
*
* @param minID
* @param maxID
@@ -106,17 +110,39 @@
* @param indexInfos
* @throws HyracksDataException
*/
- private void triggerScheduledMerge(long minID, long maxID, Set<IndexInfo> indexInfos) throws HyracksDataException {
+ private List<ILSMIOOperation> scheduleSecondaryIndexes(long minID, long maxID, Set<IndexInfo> indexInfos)
+ throws HyracksDataException {
+ List<ILSMIOOperation> mergeOps = new ArrayList<>();
for (IndexInfo info : indexInfos) {
ILSMIndex lsmIndex = info.getIndex();
-
- List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents());
- if (isMergeOngoing(immutableComponents)) {
+ List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents();
+ if (lsmIndex.isPrimaryIndex() || isMergeOngoing(diskComponents)) {
continue;
}
- List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
- for (ILSMDiskComponent component : immutableComponents) {
- ILSMDiskComponentId id = component.getComponentId();
+ List<ILSMDiskComponent> mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents);
+ ILSMIndexAccessor accessor =
+ lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ mergeOps.add(accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergeableComponents, null));
+ }
+ return mergeOps;
+ }
+
+ private void schedulePrimaryIndex(long minID, long maxID, ILSMIndex primaryIndex,
+ List<ILSMIOOperation> dependingMerges) throws HyracksDataException {
+ assert primaryIndex.isPrimaryIndex();
+ List<ILSMDiskComponent> diskComponents = primaryIndex.getDiskComponents();
+ List<ILSMDiskComponent> mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents);
+ ILSMIndexAccessor accessor =
+ primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(primaryIndex.getIOOperationCallback(), mergeableComponents, dependingMerges);
+ }
+
+ private List<ILSMDiskComponent> collectMergeableComponents(long minID, long maxID,
+ List<ILSMDiskComponent> diskComponents) throws HyracksDataException {
+ List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
+ for (ILSMDiskComponent component : diskComponents) {
+ ILSMDiskComponentId id = component.getComponentId();
+ if (!id.notFound()) {
if (id.getMinId() >= minID && id.getMaxId() <= maxID) {
mergableComponents.add(component);
}
@@ -126,10 +152,8 @@
break;
}
}
- ILSMIndexAccessor accessor =
- lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
}
+ return mergableComponents;
}
private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 71d4a96..0df8dcc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -34,6 +34,7 @@
private boolean isRegistered;
private boolean memoryAllocated;
private boolean durable;
+ private boolean correlated;
public DatasetInfo(int datasetID) {
this.indexes = new HashMap<>();
@@ -41,6 +42,7 @@
this.datasetID = datasetID;
this.setRegistered(false);
this.setMemoryAllocated(false);
+ this.setCorrelated(false);
}
@Override
@@ -195,4 +197,12 @@
public void setLastAccess(long lastAccess) {
this.lastAccess = lastAccess;
}
+
+ public void setCorrelated(boolean correlated) {
+ this.correlated = correlated;
+ }
+
+ public boolean isCorrelated() {
+ return correlated;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 37bd789..ad9f6a5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -223,7 +223,7 @@
if (iInfo.isOpen()) {
ILSMIndexAccessor accessor =
iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
+ accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback(), null);
}
// Wait for the above flush op.
@@ -417,16 +417,22 @@
}
if (asyncFlush) {
- for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
- ILSMIndexAccessor accessor =
- iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
- }
+ PrimaryIndexOperationTracker.flushDatasetIndexes(dsInfo.getDatsetIndexInfos(), dsInfo.isCorrelated());
} else {
+ List<IndexInfo> primaryIndexes = new ArrayList<>();
for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
- // TODO: This is not efficient since we flush the indexes sequentially.
- // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
- // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
+ if (iInfo.getIndex().isPrimaryIndex()) {
+ // primary indexes are flushed later to guarantee the correctness of the correlated merge policy
+ primaryIndexes.add(iInfo);
+ } else {
+ // TODO: This is not efficient since we flush the indexes sequentially.
+ // Think of a way to allow submitting the flush requests concurrently.
+ // We don't do them concurrently because this may lead to a deadlock scenario
+ // between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
+ flushAndWaitForIO(dsInfo, iInfo);
+ }
+ }
+ for (IndexInfo iInfo : primaryIndexes) {
flushAndWaitForIO(dsInfo, iInfo);
}
}
@@ -591,4 +597,5 @@
}
}
}
+
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index f2f3b93..5eb3c02 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -23,6 +23,7 @@
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.LocalResource;
@@ -95,6 +96,10 @@
datasetInfo.setExternal(!index.hasMemoryComponents());
datasetInfo.setRegistered(true);
datasetInfo.setDurable(((ILSMIndex) index).isDurable());
+ //TODO use a general mechanism to set correlated property when we have more
+ // correlated merge policies
+ datasetInfo.setCorrelated(
+ ((AbstractLSMIndex) index).getMergePolicy() instanceof CorrelatedPrefixMergePolicy);
}
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 67b25b6..4eb1c3a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -19,6 +19,9 @@
package org.apache.asterix.common.context;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -31,6 +34,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
@@ -141,17 +145,16 @@
//This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
- for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
- //get resource
- ILSMIndexAccessor accessor =
- lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ Set<IndexInfo> indexInfos = dsInfo.getDatsetIndexInfos();
+ for (IndexInfo iInfo : indexInfos) {
//update resource lsn
AbstractLSMIOOperationCallback ioOpCallback =
- (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
+ (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
ioOpCallback.updateLastLSN(logRecord.getLSN());
- //schedule flush after update
- accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
}
+
+ flushDatasetIndexes(indexInfos, dsInfo.isCorrelated());
+
flushLogCreated = false;
}
@@ -198,4 +201,63 @@
return flushLogCreated;
}
+ public static void flushDatasetIndexes(Set<IndexInfo> indexes, boolean correlated) throws HyracksDataException {
+ if (!correlated) {
+ // if not correlated, we simply schedule flushes of each index independently
+ for (IndexInfo iInfo : indexes) {
+ ILSMIndex lsmIndex = iInfo.getIndex();
+ //get resource
+ ILSMIndexAccessor accessor =
+ lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ //schedule flush after update
+ accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null);
+ }
+ } else {
+ // otherwise, we need to schedule indexes properly s.t. the primary index would depend on
+ // all secondary indexes in the same partition
+
+ // collect partitions
+ Set<Integer> partitions = new HashSet<>();
+ indexes.forEach(iInfo -> partitions.add(iInfo.getPartition()));
+ for (Integer partition : partitions) {
+ flushCorrelatedDatasetIndexes(indexes, partition);
+ }
+
+ }
+ }
+
+ private static void flushCorrelatedDatasetIndexes(Set<IndexInfo> indexes, int partition)
+ throws HyracksDataException {
+ ILSMIndex primaryIndex = null;
+ List<ILSMIOOperation> flushOps = new ArrayList<>();
+ for (IndexInfo iInfo : indexes) {
+ if (iInfo.getPartition() != partition) {
+ continue;
+ }
+ ILSMIndex lsmIndex = iInfo.getIndex();
+ if (lsmIndex.isPrimaryIndex()) {
+ primaryIndex = lsmIndex;
+ } else {
+ //get resource
+ ILSMIndexAccessor accessor =
+ lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ //schedule flush
+ ILSMIOOperation flushOp = accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null);
+ if (flushOp != null) {
+ flushOps.add(flushOp);
+ }
+ }
+ }
+
+ if (primaryIndex != null) {
+ //get resource
+ ILSMIndexAccessor accessor =
+ primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ //schedule flush after update
+ accessor.scheduleFlush(primaryIndex.getIOOperationCallback(), flushOps);
+
+ }
+
+ }
+
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index 9f071bb..ee795d5 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -236,7 +236,7 @@
return null;
}
}).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class),
- Mockito.anyListOf(ILSMDiskComponent.class));
+ Mockito.anyListOf(ILSMDiskComponent.class), Mockito.any());
Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class),
Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index 3775985..f5c3013 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -56,6 +56,7 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor.ICursorFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
@@ -170,7 +171,7 @@
// The only reason to override the following method is that it uses a different context object
// in addition, determining whether or not to keep deleted tuples is different here
@Override
- public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
ExternalBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, -1);
opCtx.setOperation(IndexOperation.MERGE);
@@ -195,9 +196,11 @@
LSMComponentFileReferences relMergeFileRefs =
fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName());
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory);
- ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, cursor,
+ MergeOperation mergeOp = new LSMBTreeMergeOperation(accessor, cursor,
relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(),
- callback, fileManager.getBaseDir().getAbsolutePath()));
+ callback, fileManager.getBaseDir().getAbsolutePath(), ctx.getDependingOps());
+ ioScheduler.scheduleOperation(mergeOp);
+ return mergeOp;
}
// This function should only be used when a transaction fail. it doesn't
@@ -369,7 +372,7 @@
// Not supported
@Override
- public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-BTree");
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index ff17905..071a4bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -319,7 +319,7 @@
}
@Override
- public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
throw HyracksDataException.create(ErrorCode.FLUSH_NOT_SUPPORTED_IN_EXTERNAL_INDEX);
}
@@ -342,7 +342,7 @@
}
@Override
- public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
ILSMIndexOperationContext bctx = createOpContext(NoOpOperationCallback.INSTANCE, 0);
bctx.setOperation(IndexOperation.MERGE);
@@ -363,11 +363,12 @@
keepDeleteTuples = mergingComponents.get(mergingComponents.size() - 1) != secondDiskComponents
.get(secondDiskComponents.size() - 1);
}
-
- ioScheduler.scheduleOperation(
+ ILSMIOOperation mergeOp =
new LSMBTreeWithBuddyMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(),
relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(),
- callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples));
+ callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples, ctx.getDependingOps());
+ ioScheduler.scheduleOperation(mergeOp);
+ return mergeOp;
}
// This method creates the appropriate opContext for the targeted version
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index c7d45e1..00a489e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -462,7 +462,8 @@
LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) {
ILSMIndexAccessor accessor = createAccessor(opCtx);
return new LSMBTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(),
- componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath());
+ componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(),
+ opCtx.getDependingOps());
}
@Override
@@ -611,6 +612,7 @@
}
ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples);
return new LSMBTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(),
- mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath());
+ mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(),
+ opCtx.getDependingOps());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index e3424e5..92b53de 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -18,7 +18,10 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.impls;
+import java.util.List;
+
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
@@ -27,8 +30,9 @@
private final FileReference bloomFilterFlushTarget;
public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget,
- FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, flushTarget, callback, indexIdentifier);
+ FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier,
+ List<ILSMIOOperation> dependingOps) {
+ super(accessor, flushTarget, callback, indexIdentifier, dependingOps);
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index ec96303..40ac7b1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -19,8 +19,11 @@
package org.apache.hyracks.storage.am.lsm.btree.impls;
+import java.util.List;
+
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
@@ -30,8 +33,9 @@
private final FileReference bloomFilterMergeTarget;
public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target,
- FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, target, callback, indexIdentifier, cursor);
+ FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier,
+ List<ILSMIOOperation> dependingOps) {
+ super(accessor, target, callback, indexIdentifier, cursor, dependingOps);
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
index f682bde..e0c1512 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
@@ -18,8 +18,11 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.impls;
+import java.util.List;
+
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
@@ -32,8 +35,8 @@
public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target,
FileReference buddyBtreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback,
- String indexIdentifier, boolean keepDeletedTuples) {
- super(accessor, target, callback, indexIdentifier, cursor);
+ String indexIdentifier, boolean keepDeletedTuples, List<ILSMIOOperation> dependingOps) {
+ super(accessor, target, callback, indexIdentifier, cursor, dependingOps);
this.buddyBtreeMergeTarget = buddyBtreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
this.keepDeletedTuples = keepDeletedTuples;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 89c8cb9..380b2f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -105,10 +105,12 @@
*
* @param ctx
* @param callback
+ * @return The scheduled merge operation, used for the caller to track its status
* @throws HyracksDataException
* @throws IndexException
*/
- void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException;
+ ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException;
/**
* Schedule full merge
@@ -135,9 +137,11 @@
*
* @param ctx
* @param callback
+ * @return The scheduled flush operation, used for the caller to track its status
* @throws HyracksDataException
*/
- void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException;
+ ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException;
/**
* Perform a flush
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index c2ae786..ff1613b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.storage.am.lsm.common.api;
+import java.util.List;
import java.util.concurrent.Callable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -63,7 +64,19 @@
FileReference getTarget();
/**
+ * <<<<<<< HEAD
+ *
* @return the accessor of the operation
*/
ILSMIndexAccessor getAccessor();
+
+ /**
+ * @return whether this operation has finished
+ */
+ boolean isFinished();
+
+ /**
+ * @return a list of operations that this operation depends on
+ */
+ List<ILSMIOOperation> getDependingOps();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index addeb27..5f43bcd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -67,11 +67,13 @@
public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException;
- void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException;
+ ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException;
ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException;
- void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException;
+ ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException;
ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index b8d64af..b303a39 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -46,9 +46,13 @@
*
* @param callback
* the IO operation callback
+ * @param dependingOps
+ * other operations that this operation depends on
+ * @return The scheduled flush operation
* @throws HyracksDataException
*/
- void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException;
+ ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps)
+ throws HyracksDataException;
/**
* Schedule a merge operation
@@ -57,11 +61,13 @@
* the merge operation callback
* @param components
* the components to be merged
+ * @param dependingOps
+ * other operations that this operation depends on
+ * @return The scheduled merge operation
* @throws HyracksDataException
- * @throws IndexException
*/
- void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components)
- throws HyracksDataException;
+ ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components,
+ List<ILSMIOOperation> dependingOps) throws HyracksDataException;
/**
* Schedule a full merge
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index 5b0378a..66af93f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -56,4 +56,8 @@
PermutingTupleReference getFilterTuple();
MultiComparator getFilterCmp();
+
+ List<ILSMIOOperation> getDependingOps();
+
+ void setDependingOps(List<ILSMIOOperation> dependingOps);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
index aee46f0..1d13c94 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
@@ -18,6 +18,10 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
@@ -30,13 +34,32 @@
protected final FileReference target;
protected final ILSMIOOperationCallback callback;
protected final String indexIdentifier;
+ protected final List<ILSMIOOperation> dependingOps;
+
+ protected AtomicBoolean isFinished = new AtomicBoolean(false);
public AbstractIoOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback,
- String indexIdentifier) {
+ String indexIdentifier, List<ILSMIOOperation> dependingOps) {
this.accessor = accessor;
this.target = target;
this.callback = callback;
this.indexIdentifier = indexIdentifier;
+ this.dependingOps = dependingOps;
+ }
+
+ protected abstract void callInternal() throws HyracksDataException;
+
+ @Override
+ public Boolean call() throws HyracksDataException {
+ try {
+ callInternal();
+ } finally {
+ synchronized (this) {
+ isFinished.set(true);
+ notifyAll();
+ }
+ }
+ return true;
}
@Override
@@ -63,4 +86,14 @@
public String getIndexIdentifier() {
return indexIdentifier;
}
+
+ @Override
+ public List<ILSMIOOperation> getDependingOps() {
+ return dependingOps;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return isFinished.get();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index dc64f9b..e7b21cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -86,6 +86,7 @@
protected final int[] treeFields;
protected final int[] filterFields;
protected final boolean durable;
+ protected final ILSMMergePolicy mergePolicy;
protected boolean isActive;
protected final AtomicBoolean[] flushRequests;
protected boolean memoryComponentsAllocated = false;
@@ -113,6 +114,7 @@
this.inactiveDiskComponents = new LinkedList<>();
this.durable = durable;
this.tracer = tracer;
+ this.mergePolicy = mergePolicy;
lsmHarness = new LSMHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(), tracer);
isActive = false;
diskComponents = new ArrayList<>();
@@ -135,6 +137,7 @@
this.ioScheduler = ioScheduler;
this.ioOpCallback = ioOpCallback;
this.durable = durable;
+ this.mergePolicy = mergePolicy;
lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled());
isActive = false;
diskComponents = new LinkedList<>();
@@ -199,7 +202,7 @@
protected void flushMemoryComponent() throws HyracksDataException {
BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(cb);
+ accessor.scheduleFlush(cb, null);
try {
cb.waitForIO();
} catch (InterruptedException e) {
@@ -326,19 +329,21 @@
}
@Override
- public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
AbstractLSMIndexOperationContext opCtx =
createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
opCtx.setOperation(ctx.getOperation());
opCtx.getComponentHolder().addAll(ctx.getComponentHolder());
+ opCtx.setDependingOps(ctx.getDependingOps());
ILSMIOOperation flushOp = createFlushOperation(opCtx, componentFileRefs, callback);
ioScheduler.scheduleOperation(TracedIOOperation.wrap(flushOp, tracer));
+ return flushOp;
}
@Override
- public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
// merge must create a different op ctx
@@ -346,11 +351,13 @@
createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
opCtx.setOperation(ctx.getOperation());
opCtx.getComponentHolder().addAll(mergingComponents);
+ opCtx.setDependingOps(ctx.getDependingOps());
ILSMDiskComponent firstComponent = (ILSMDiskComponent) mergingComponents.get(0);
ILSMDiskComponent lastComponent = (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1);
LSMComponentFileReferences mergeFileRefs = getMergeFileReferences(firstComponent, lastComponent);
- ILSMIOOperation mergeOp = createMergeOperation(opCtx, mergeFileRefs, callback);
+ MergeOperation mergeOp = (MergeOperation) createMergeOperation(opCtx, mergeFileRefs, callback);
ioScheduler.scheduleOperation(TracedIOOperation.wrap(mergeOp, tracer));
+ return mergeOp;
}
private void addOperationalMutableComponents(List<ILSMComponent> operationalComponents) {
@@ -628,6 +635,10 @@
: doMerge(operation);
}
+ public ILSMMergePolicy getMergePolicy() {
+ return mergePolicy;
+ }
+
public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent);
protected abstract void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java
index 065d465..5fdeafd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
@@ -47,6 +48,7 @@
protected IndexOperation op;
protected boolean accessingComponents = false;
protected ISearchPredicate searchPredicate;
+ protected final List<ILSMIOOperation> dependingOps;
public AbstractLSMIndexOperationContext(int[] treeFields, int[] filterFields,
IBinaryComparatorFactory[] filterCmpFactories, ISearchOperationCallback searchCallback,
@@ -56,6 +58,7 @@
this.componentHolder = new LinkedList<>();
this.componentsToBeMerged = new LinkedList<>();
this.componentsToBeReplicated = new LinkedList<>();
+ this.dependingOps = new LinkedList<>();
if (filterFields != null) {
indexTuple = new PermutingTupleReference(treeFields);
filterCmp = MultiComparator.create(filterCmpFactories);
@@ -153,4 +156,17 @@
public ISearchPredicate getSearchPredicate() {
return searchPredicate;
}
+
+ @Override
+ public List<ILSMIOOperation> getDependingOps() {
+ return dependingOps;
+ }
+
+ @Override
+ public void setDependingOps(List<ILSMIOOperation> dependingOps) {
+ this.dependingOps.clear();
+ if (dependingOps != null) {
+ this.dependingOps.addAll(dependingOps);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index 847b882..7ac9bfb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -49,7 +49,7 @@
} else if (immutableComponents.size() >= numComponents) {
ILSMIndexAccessor accessor =
index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
+ accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null);
}
}
@@ -106,7 +106,7 @@
}
ILSMIndexAccessor accessor =
index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
+ accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null);
return true;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index 2f65b18..b93d943 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@ -203,13 +203,13 @@
}
@Override
- public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
callback.afterFinalize(LSMOperationType.MERGE, null);
- return;
+ return null;
}
- lsmIndex.scheduleMerge(ctx, callback);
+ return lsmIndex.scheduleMerge(ctx, callback);
}
@Override
@@ -297,9 +297,10 @@
}
@Override
- public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
callback.afterFinalize(LSMOperationType.FLUSH, null);
+ return null;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
index 7b7f950..750c690 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.List;
import java.util.Objects;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -30,14 +31,13 @@
public class FlushOperation extends AbstractIoOperation implements Comparable<ILSMIOOperation> {
public FlushOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback,
- String indexIdentifier) {
- super(accessor, target, callback, indexIdentifier);
+ String indexIdentifier, List<ILSMIOOperation> dependingOps) {
+ super(accessor, target, callback, indexIdentifier, dependingOps);
}
@Override
- public Boolean call() throws HyracksDataException {
+ protected void callInternal() throws HyracksDataException {
accessor.flush(this);
- return true;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 1069f8f..bc3a5a1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -500,13 +500,13 @@
}
@Override
- public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) {
callback.afterFinalize(LSMOperationType.FLUSH, null);
- return;
+ return null;
}
- lsmIndex.scheduleFlush(ctx, callback);
+ return lsmIndex.scheduleFlush(ctx, callback);
}
@Override
@@ -519,6 +519,7 @@
boolean failedOperation = false;
try {
newComponent = lsmIndex.flush(operation);
+ waitForDependingOps(operation);
operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent);
newComponent.markAsValid(lsmIndex.isDurable());
} catch (Throwable e) {
@@ -537,13 +538,13 @@
}
@Override
- public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
callback.afterFinalize(LSMOperationType.MERGE, null);
- return;
+ return null;
}
- lsmIndex.scheduleMerge(ctx, callback);
+ return lsmIndex.scheduleMerge(ctx, callback);
}
@Override
@@ -570,6 +571,7 @@
boolean failedOperation = false;
try {
newComponent = lsmIndex.merge(operation);
+ waitForDependingOps(operation);
operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
newComponent.markAsValid(lsmIndex.isDurable());
} catch (Throwable e) {
@@ -754,6 +756,32 @@
}
}
+ /**
+ * Wait for depending operations to finish.
+ *
+ * @param op
+ */
+ private void waitForDependingOps(ILSMIOOperation op) throws HyracksDataException {
+ List<ILSMIOOperation> dependingOps = op.getDependingOps();
+ if (dependingOps == null) {
+ return;
+ }
+ for (ILSMIOOperation dependingOp : dependingOps) {
+ if (dependingOp != null && !dependingOp.isFinished()) {
+ synchronized (dependingOp) {
+ while (!dependingOp.isFinished()) {
+ try {
+ dependingOp.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+ }
+ }
+ }
+
@Override
public void deleteComponents(ILSMIndexOperationContext ctx, Predicate<ILSMComponent> predicate)
throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index c0fd443..f008fde 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -135,18 +135,21 @@
}
@Override
- public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException {
+ public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps)
+ throws HyracksDataException {
ctx.setOperation(IndexOperation.FLUSH);
- lsmHarness.scheduleFlush(ctx, callback);
+ ctx.setDependingOps(dependingOps);
+ return lsmHarness.scheduleFlush(ctx, callback);
}
@Override
- public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components)
- throws HyracksDataException {
+ public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components,
+ List<ILSMIOOperation> dependingOps) throws HyracksDataException {
ctx.setOperation(IndexOperation.MERGE);
ctx.getComponentsToBeMerged().clear();
ctx.getComponentsToBeMerged().addAll(components);
- lsmHarness.scheduleMerge(ctx, callback);
+ ctx.setDependingOps(dependingOps);
+ return lsmHarness.scheduleMerge(ctx, callback);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
index c83d534..2210fd0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
@@ -31,8 +32,8 @@
protected final IIndexCursor cursor;
public MergeOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback,
- String indexIdentifier, IIndexCursor cursor) {
- super(accessor, target, callback, indexIdentifier);
+ String indexIdentifier, IIndexCursor cursor, List<ILSMIOOperation> dependingOps) {
+ super(accessor, target, callback, indexIdentifier, dependingOps);
this.cursor = cursor;
}
@@ -41,12 +42,6 @@
}
@Override
- public Boolean call() throws HyracksDataException {
- accessor.merge(this);
- return true;
- }
-
- @Override
public LSMIOOpertionType getIOOpertionType() {
return LSMIOOpertionType.MERGE;
}
@@ -54,4 +49,10 @@
public IIndexCursor getCursor() {
return cursor;
}
+
+ @Override
+ protected void callInternal() throws HyracksDataException {
+ accessor.merge(this);
+
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
index 7d7266e..f159232 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -247,7 +247,7 @@
Collections.reverse(mergableComponents);
ILSMIndexAccessor accessor =
index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents);
+ accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents, null);
}
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
index 85081a1..7cdcc52 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
@@ -59,7 +59,7 @@
&& index.hasFlushRequestForCurrentMutableComponent()) {
ILSMIndexAccessor accessor =
index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback());
+ accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
index 9cc8022..3e35e51 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.List;
import java.util.logging.Logger;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -100,6 +101,16 @@
public ILSMIndexAccessor getAccessor() {
return ioOp.getAccessor();
}
+
+ @Override
+ public boolean isFinished() {
+ return ioOp.isFinished();
+ }
+
+ @Override
+ public List<ILSMIOOperation> getDependingOps() {
+ return ioOp.getDependingOps();
+ }
}
class ComparableTracedIOOperation extends TracedIOOperation implements Comparable<ILSMIOOperation> {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index eb3924c..60da50a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -691,17 +691,20 @@
LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback)
throws HyracksDataException {
return new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(getLsmHarness(), opCtx),
- componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
- componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath());
+ componentFileRefs.getInsertIndexFileReference(),
+ componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(),
+ callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps());
}
@Override
protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
- LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException {
+ LSMComponentFileReferences mergeFileRefs,
+ ILSMIOOperationCallback callback) throws HyracksDataException {
ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getLsmHarness(), opCtx);
IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(opCtx);
- return new LSMInvertedIndexMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(),
- mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback,
- fileManager.getBaseDir().getAbsolutePath());
+ return new LSMInvertedIndexMergeOperation(accessor, cursor,
+ mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(),
+ mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(),
+ opCtx.getDependingOps());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 61fc84e..242bc83 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -85,9 +85,11 @@
}
@Override
- public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException {
+ public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps)
+ throws HyracksDataException {
ctx.setOperation(IndexOperation.FLUSH);
- lsmHarness.scheduleFlush(ctx, callback);
+ ctx.setDependingOps(dependingOps);
+ return lsmHarness.scheduleFlush(ctx, callback);
}
@Override
@@ -96,12 +98,13 @@
}
@Override
- public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components)
- throws HyracksDataException {
+ public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components,
+ List<ILSMIOOperation> dependingOps) throws HyracksDataException {
ctx.setOperation(IndexOperation.MERGE);
ctx.getComponentsToBeMerged().clear();
ctx.getComponentsToBeMerged().addAll(components);
- lsmHarness.scheduleMerge(ctx, callback);
+ ctx.setDependingOps(dependingOps);
+ return lsmHarness.scheduleMerge(ctx, callback);
}
@Override
@@ -116,6 +119,7 @@
@Override
public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException {
ctx.setOperation(IndexOperation.FULL_MERGE);
+ ctx.setDependingOps(null);
lsmHarness.scheduleFullMerge(ctx, callback);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
index 2106f6a..30e1cec 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
@@ -19,7 +19,10 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
+import java.util.List;
+
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
@@ -30,8 +33,8 @@
public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget,
FileReference deletedKeysBTreeFlushTarget, FileReference bloomFilterFlushTarget,
- ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, flushTarget, callback, indexIdentifier);
+ ILSMIOOperationCallback callback, String indexIdentifier, List<ILSMIOOperation> dependingOps) {
+ super(accessor, flushTarget, callback, indexIdentifier, dependingOps);
this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget;
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index 2c1db0f..8361f24 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -19,7 +19,10 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
+import java.util.List;
+
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
@@ -31,8 +34,8 @@
public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, FileReference target,
FileReference deletedKeysBTreeMergeTarget, FileReference bloomFilterMergeTarget,
- ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, target, callback, indexIdentifier, cursor);
+ ILSMIOOperationCallback callback, String indexIdentifier, List<ILSMIOOperation> dependingOps) {
+ super(accessor, target, callback, indexIdentifier, cursor, dependingOps);
this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index 6595403..110d873 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -416,7 +416,7 @@
// Not supported
@Override
- public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-RTree");
}
@@ -623,7 +623,7 @@
// The only change the the schedule merge is the method used to create the
// opCtx. first line <- in schedule merge, we->
@Override
- public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
ILSMIndexOperationContext rctx = createOpContext(NoOpOperationCallback.INSTANCE, -1);
rctx.setOperation(IndexOperation.MERGE);
@@ -634,10 +634,12 @@
(ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1));
ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), rctx, buddyBTreeFields);
// create the merge operation.
- LSMRTreeMergeOperation mergeOp = new LSMRTreeMergeOperation(accessor, cursor,
- relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
- relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath());
+ ILSMIOOperation mergeOp =
+ new LSMRTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(),
+ relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(),
+ callback, fileManager.getBaseDir().getAbsolutePath(), ctx.getDependingOps());
ioScheduler.scheduleOperation(mergeOp);
+ return mergeOp;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index ca0e4e1..a4d6a65 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -420,7 +420,7 @@
LSMRTreeAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields);
return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(),
componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(),
- callback, fileManager.getBaseDir().getAbsolutePath());
+ callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps());
}
@Override
@@ -430,6 +430,6 @@
ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields);
return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(),
mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback,
- fileManager.getBaseDir().getAbsolutePath());
+ fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index 6991c56..a08d854 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -18,7 +18,10 @@
*/
package org.apache.hyracks.storage.am.lsm.rtree.impls;
+import java.util.List;
+
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
@@ -29,8 +32,9 @@
private final FileReference bloomFilterFlushTarget;
public LSMRTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, FileReference btreeFlushTarget,
- FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, flushTarget, callback, indexIdentifier);
+ FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier,
+ List<ILSMIOOperation> dependingOps) {
+ super(accessor, flushTarget, callback, indexIdentifier, dependingOps);
this.btreeFlushTarget = btreeFlushTarget;
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index 83872cf..a07e57b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -18,8 +18,11 @@
*/
package org.apache.hyracks.storage.am.lsm.rtree.impls;
+import java.util.List;
+
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
@@ -30,8 +33,8 @@
public LSMRTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target,
FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback,
- String indexIdentifier) {
- super(accessor, target, callback, indexIdentifier, cursor);
+ String indexIdentifier, List<ILSMIOOperation> dependingOps) {
+ super(accessor, target, callback, indexIdentifier, cursor, dependingOps);
this.btreeMergeTarget = btreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 1e15455..a18f10c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -332,7 +332,7 @@
throws HyracksDataException {
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory);
return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), null, null,
- callback, fileManager.getBaseDir().getAbsolutePath());
+ callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps());
}
@Override
@@ -346,6 +346,6 @@
ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(opCtx, returnDeletedTuples);
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory);
return new MergeOperation(accessor, mergeFileRefs.getInsertIndexFileReference(), callback,
- fileManager.getBaseDir().getAbsolutePath(), cursor);
+ fileManager.getBaseDir().getAbsolutePath(), cursor, opCtx.getDependingOps());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java
index 5d6d8de..4f7515e 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java
@@ -72,7 +72,7 @@
accessor.insert(tuple);
// Flush to generate a disk component
- accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback());
+ accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null);
// Make sure the disk component was generated
LSMBTree btree = (LSMBTree) ctx.getIndex();
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java
index c5eb97c..98faf01 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java
@@ -117,7 +117,7 @@
StubIOOperationCallback stub = new StubIOOperationCallback();
BlockingIOOperationCallbackWrapper waiter = new BlockingIOOperationCallbackWrapper(stub);
- accessor.scheduleFlush(waiter);
+ accessor.scheduleFlush(waiter, null);
waiter.waitForIO();
if (minMax != null) {
Pair<ITupleReference, ITupleReference> obsMinMax =
@@ -146,7 +146,7 @@
}
}
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((LSMBTree) ctx.getIndex()).getDiskComponents());
+ ((LSMBTree) ctx.getIndex()).getDiskComponents(), null);
flushedComponents = ((LSMBTree) ctx.getIndex()).getDiskComponents();
Pair<ITupleReference, ITupleReference> mergedMinMax =
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index 7dac1e5..15dbfdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -76,7 +76,7 @@
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((LSMBTree) ctx.getIndex()).getDiskComponents());
+ ((LSMBTree) ctx.getIndex()).getDiskComponents(), null);
orderedIndexTestUtils.checkPointSearches(ctx);
orderedIndexTestUtils.checkScan(ctx);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
index b633614..11ecf0b 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
@@ -81,7 +81,7 @@
}
if (j == 1) {
- accessor.scheduleFlush(ioOpCallback);
+ accessor.scheduleFlush(ioOpCallback, null);
ioOpCallback.waitForIO();
isFoundNull = true;
} else {
@@ -94,7 +94,7 @@
}
if (j == 1) {
- accessor.scheduleFlush(ioOpCallback);
+ accessor.scheduleFlush(ioOpCallback, null);
ioOpCallback.waitForIO();
isFoundNull = true;
} else {
@@ -106,7 +106,7 @@
accessor.delete(tuple);
}
- accessor.scheduleFlush(ioOpCallback);
+ accessor.scheduleFlush(ioOpCallback, null);
ioOpCallback.waitForIO();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java
index acbeaef..f3cf9de 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java
@@ -19,7 +19,7 @@
package org.apache.hyracks.storage.am.lsm.btree;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
@@ -116,17 +116,17 @@
//component 2 contains 1 and 2
upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes));
upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes));
- accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback());
+ accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null);
//component 1 contains 1 and -2
upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes));
deleteTuple(ctx, fieldSerdes, getValue(2, fieldSerdes));
- accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback());
+ accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null);
//component 0 contains 2 and 3
upsertTuple(ctx, fieldSerdes, getValue(3, fieldSerdes));
upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes));
- accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback());
+ accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null);
LSMBTree btree = (LSMBTree) ctx.getIndex();
Assert.assertEquals("Check disk components", 3, btree.getDiskComponents().size());
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java
index fbcbcc2..a7efd6d 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java
@@ -19,7 +19,7 @@
package org.apache.hyracks.storage.am.lsm.btree;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
@@ -352,7 +352,7 @@
op1.performOperation(ctx, AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT);
op2.performOperation(ctx,
AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT / AccessMethodTestsConfig.BTREE_NUM_INSERT_ROUNDS);
- accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback());
+ accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null);
LSMBTree btree = (LSMBTree) ctx.getIndex();
Assert.assertEquals("Check disk components", 1, btree.getDiskComponents().size());
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
index e059faa..adcb3d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
@@ -109,7 +109,7 @@
}
if (j == 1) {
- lsmAccessor.scheduleFlush(ioOpCallback);
+ lsmAccessor.scheduleFlush(ioOpCallback, null);
ioOpCallback.waitForIO();
isFoundNull = true;
isUpdated = false;
@@ -124,7 +124,7 @@
}
if (j == 1) {
- lsmAccessor.scheduleFlush(ioOpCallback);
+ lsmAccessor.scheduleFlush(ioOpCallback, null);
ioOpCallback.waitForIO();
} else {
isFoundNull = false;
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index 6c1a406..0e16280 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -103,17 +103,19 @@
}
@Override
- public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
- super.scheduleFlush(ctx, callback);
+ ILSMIOOperation flushOp = super.scheduleFlush(ctx, callback);
numScheduledFlushes++;
+ return flushOp;
}
@Override
- public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
- super.scheduleMerge(ctx, callback);
+ ILSMIOOperation mergeOp = super.scheduleMerge(ctx, callback);
numScheduledMerges++;
+ return mergeOp;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 1667e47..377ce05 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -119,7 +119,7 @@
case MERGE:
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- lsmBTree.getDiskComponents());
+ lsmBTree.getDiskComponents(), null);
break;
default:
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java
index e521b4b..eaa0321 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java
@@ -221,7 +221,7 @@
return null;
}
}).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class),
- Mockito.anyListOf(ILSMDiskComponent.class));
+ Mockito.anyListOf(ILSMDiskComponent.class), Mockito.any());
Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class),
Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
index d093aac..fd744b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
@@ -58,7 +58,7 @@
}
// Perform merge.
invIndexAccessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((LSMInvertedIndex) invIndex).getDiskComponents());
+ ((LSMInvertedIndex) invIndex).getDiskComponents(), null);
validateAndCheckIndex(testCtx);
runTinySearchWorkload(testCtx, tupleGen);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
index 3dc7262..cfca8f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
@@ -60,7 +60,7 @@
}
// Perform merge.
invIndexAccessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((LSMInvertedIndex) invIndex).getDiskComponents());
+ ((LSMInvertedIndex) invIndex).getDiskComponents(), null);
validateAndCheckIndex(testCtx);
runTinySearchWorkload(testCtx, tupleGen);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
index 2345698..594e019 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
@@ -116,7 +116,7 @@
case MERGE: {
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- invIndex.getDiskComponents());
+ invIndex.getDiskComponents(), null);
break;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
index 9209a3e..58c71dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
@@ -78,7 +78,7 @@
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((AbstractLSMRTree) ctx.getIndex()).getDiskComponents());
+ ((AbstractLSMRTree) ctx.getIndex()).getDiskComponents(), null);
rTreeTestUtils.checkScan(ctx);
rTreeTestUtils.checkDiskOrderScan(ctx);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
index fe4870b..4630c28 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
@@ -79,7 +79,7 @@
case MERGE:
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- lsmRTree.getDiskComponents());
+ lsmRTree.getDiskComponents(), null);
break;
default:
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
index 2855f2e..bbada89 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
@@ -68,7 +68,7 @@
case MERGE:
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((AbstractLSMRTree) lsmRTree).getDiskComponents());
+ ((AbstractLSMRTree) lsmRTree).getDiskComponents(), null);
break;
default: