[NO ISSUE][STO] Component Deletes Through flushes and merges
- user model changes: no
- storage format changes: no
- interface changes: yes
- moved validation of component from the index:
- ILSMIndex and all of its implementations
to the component:
- ILSMDiskComponent and all of its implementations
details:
- This change enables component level deletes.
Change-Id: I178656207bfa1d15e6ae5ff2403a16df33940773
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2017
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 489e5de..717b950 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -16,7 +16,8 @@
! specific language governing permissions and limitations
! under the License.
!-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-asterixdb</artifactId>
@@ -169,7 +170,7 @@
</goals>
<configuration>
<licenses combine.children="append">
- <license implementation="org.apache.rat.analysis.license.MITLicense"/>
+ <license implementation="org.apache.rat.analysis.license.MITLicense" />
</licenses>
<excludes combine.children="append">
<exclude>src/test/resources/**/results_parser_sqlpp/**</exclude>
@@ -192,7 +193,7 @@
<configuration>
<reportFile>${project.build.directory}/webqueryui-rat.txt</reportFile>
<licenses combine.children="append">
- <license implementation="org.apache.rat.analysis.license.MITLicense"/>
+ <license implementation="org.apache.rat.analysis.license.MITLicense" />
<license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
<licenseFamilyCategory>MIT</licenseFamilyCategory>
<licenseFamilyName>JQuery</licenseFamilyName>
@@ -210,7 +211,7 @@
</license>
</licenses>
<licenseFamilies combine.children="append">
- <licenseFamily implementation="org.apache.rat.license.MITLicenseFamily"/>
+ <licenseFamily implementation="org.apache.rat.license.MITLicenseFamily" />
<licenseFamily implementation="org.apache.rat.license.SimpleLicenseFamily">
<familyName>JQuery</familyName>
</licenseFamily>
@@ -567,5 +568,11 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-btree-test</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 10af9ff..04e6313 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -362,7 +362,7 @@
try {
maxDiskLastLsn =
((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
- .getComponentLSN(lsmIndex.getImmutableComponents());
+ .getComponentLSN(lsmIndex.getDiskComponents());
} catch (HyracksDataException e) {
datasetLifecycleManager.close(localResource.getPath());
throw e;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 53a4f23..104f80b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -39,14 +39,12 @@
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.types.ARecordType;
@@ -68,7 +66,6 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
@@ -83,7 +80,6 @@
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
-import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -93,11 +89,10 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.util.file.FileUtil;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
public class TestNodeController {
protected static final Logger LOGGER = Logger.getLogger(TestNodeController.class.getName());
@@ -144,12 +139,7 @@
}
jobletCtx = Mockito.mock(IHyracksJobletContext.class);
Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
- Mockito.when(jobletCtx.getJobId()).thenAnswer(new Answer<JobId>() {
- @Override
- public JobId answer(InvocationOnMock invocation) throws Throwable {
- return jobId;
- }
- });
+ Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
}
public void deInit() throws Exception {
@@ -167,8 +157,7 @@
int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
StorageComponentProvider storageComponentProvider) throws AlgebricksException, HyracksDataException {
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
- storageComponentProvider);
+ mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
IndexOperation op = IndexOperation.INSERT;
IModificationOperationCallbackFactory modOpCallbackFactory =
new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
@@ -176,7 +165,7 @@
ResourceType.LSM_BTREE);
IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
- storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider);
+ storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
LSMInsertDeleteOperatorNodePushable insertOp = new LSMInsertDeleteOperatorNodePushable(ctx, PARTITION,
primaryIndexInfo.primaryIndexInsertFieldsPermutations,
recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), op,
@@ -196,10 +185,9 @@
IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx);
JobSpecification spec = new JobSpecification();
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
- storageComponentProvider);
+ mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(
- storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider);
+ storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc,
null, null, true, true, indexDataflowHelperFactory, false, false, null,
NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false);
@@ -236,13 +224,12 @@
}
}
- public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
+ public PrimaryIndexInfo createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
int[] filterFields, IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes,
List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException {
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
- storageComponentProvider);
+ mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
Dataverse dataverse = new Dataverse(dataset.getDataverseName(), NonTaggedDataFormat.class.getName(),
MetadataUtil.PENDING_NO_OP);
MetadataProvider mdProvider = new MetadataProvider(
@@ -252,34 +239,17 @@
recordType, metaType, mergePolicyFactory, mergePolicyProperties);
IndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
- primaryIndexInfo.fileSplitProvider, resourceFactory, !dataset.isTemp());
+ primaryIndexInfo.getFileSplitProvider(), resourceFactory, !dataset.isTemp());
IHyracksTaskContext ctx = createTestContext(false);
IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, 0);
indexBuilder.build();
} finally {
mdProvider.getLocks().unlock();
}
+ return primaryIndexInfo;
}
- private int[] createPrimaryIndexBloomFilterFields(int length) {
- int[] primaryIndexBloomFilterKeyFields = new int[length];
- for (int j = 0; j < length; ++j) {
- primaryIndexBloomFilterKeyFields[j] = j;
- }
- return primaryIndexBloomFilterKeyFields;
- }
-
- private IBinaryComparatorFactory[] createPrimaryIndexComparatorFactories(IAType[] primaryKeyTypes) {
- IBinaryComparatorFactory[] primaryIndexComparatorFactories =
- new IBinaryComparatorFactory[primaryKeyTypes.length];
- for (int j = 0; j < primaryKeyTypes.length; ++j) {
- primaryIndexComparatorFactories[j] =
- BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(primaryKeyTypes[j], true);
- }
- return primaryIndexComparatorFactories;
- }
-
- private ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields,
+ public static ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields,
IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType) {
int i = 0;
ISerializerDeserializer<?>[] primaryIndexSerdes = new ISerializerDeserializer<?>[primaryIndexNumOfTupleFields];
@@ -294,7 +264,7 @@
return primaryIndexSerdes;
}
- private ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
+ public static ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
ARecordType recordType, ARecordType metaType) {
ITypeTraits[] primaryIndexTypeTraits = new ITypeTraits[primaryIndexNumOfTupleFields];
int i = 0;
@@ -336,55 +306,34 @@
return (DatasetLifecycleManager) getAppRuntimeContext().getDatasetLifecycleManager();
}
- @SuppressWarnings("unused")
- private class PrimaryIndexInfo {
- private Dataset dataset;
+ public static class PrimaryIndexInfo {
private IAType[] primaryKeyTypes;
private ARecordType recordType;
private ARecordType metaType;
private ILSMMergePolicyFactory mergePolicyFactory;
private Map<String, String> mergePolicyProperties;
- private int[] filterFields;
private int primaryIndexNumOfTupleFields;
- private IBinaryComparatorFactory[] primaryIndexComparatorFactories;
private ITypeTraits[] primaryIndexTypeTraits;
private ISerializerDeserializer<?>[] primaryIndexSerdes;
- private int[] primaryIndexBloomFilterKeyFields;
- private ITypeTraits[] filterTypeTraits;
- private IBinaryComparatorFactory[] filterCmpFactories;
- private int[] btreeFields;
private ConstantFileSplitProvider fileSplitProvider;
private RecordDescriptor rDesc;
private int[] primaryIndexInsertFieldsPermutations;
private int[] primaryKeyIndexes;
- private List<List<String>> keyFieldNames;
- private List<Integer> keyFieldSourceIndicators;
- private List<IAType> keyFieldTypes;
private Index index;
- private IStorageComponentProvider storageComponentProvider;
public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
- int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
- IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
- this.storageComponentProvider = storageComponentProvider;
- this.dataset = dataset;
+ int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators)
+ throws AlgebricksException {
this.primaryKeyTypes = primaryKeyTypes;
this.recordType = recordType;
this.metaType = metaType;
this.mergePolicyFactory = mergePolicyFactory;
this.mergePolicyProperties = mergePolicyProperties;
- this.filterFields = filterFields;
this.primaryKeyIndexes = primaryKeyIndexes;
primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1));
primaryIndexTypeTraits =
createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
- primaryIndexComparatorFactories = createPrimaryIndexComparatorFactories(primaryKeyTypes);
- primaryIndexBloomFilterKeyFields = createPrimaryIndexBloomFilterFields(primaryKeyTypes.length);
- filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, recordType);
- filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(dataset, recordType,
- NonTaggedDataFormat.INSTANCE.getBinaryComparatorFactoryProvider());
- btreeFields = DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
primaryIndexSerdes =
createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
@@ -392,23 +341,22 @@
for (int i = 0; i < primaryIndexNumOfTupleFields; i++) {
primaryIndexInsertFieldsPermutations[i] = i;
}
- keyFieldSourceIndicators = primaryKeyIndicators;
- keyFieldNames = new ArrayList<>();
- keyFieldTypes = Arrays.asList(primaryKeyTypes);
- for (int i = 0; i < keyFieldSourceIndicators.size(); i++) {
- Integer indicator = keyFieldSourceIndicators.get(i);
+ List<List<String>> keyFieldNames = new ArrayList<>();
+ List<IAType> keyFieldTypes = Arrays.asList(primaryKeyTypes);
+ for (int i = 0; i < primaryKeyIndicators.size(); i++) {
+ Integer indicator = primaryKeyIndicators.get(i);
String[] fieldNames =
indicator == Index.RECORD_INDICATOR ? recordType.getFieldNames() : metaType.getFieldNames();
keyFieldNames.add(Arrays.asList(fieldNames[primaryKeyIndexes[i]]));
}
index = new Index(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(),
- IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, keyFieldTypes, false, false, true,
+ IndexType.BTREE, keyFieldNames, primaryKeyIndicators, keyFieldTypes, false, false, true,
MetadataUtil.PENDING_NO_OP);
List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId());
- FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(
- ((ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext())
- .getClusterStateManager(),
- dataset, index.getIndexName(), nodes);
+ CcApplicationContext appCtx =
+ (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+ FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), dataset,
+ index.getIndexName(), nodes);
fileSplitProvider = new ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1));
}
@@ -436,6 +384,10 @@
.thenReturn(searcgRecDesc);
return rDescProvider;
}
+
+ public ConstantFileSplitProvider getFileSplitProvider() {
+ return fileSplitProvider;
+ }
}
public RecordDescriptor getSearchOutputDesc(IAType[] keyTypes, ARecordType recordType, ARecordType metaType) {
@@ -450,18 +402,12 @@
public IndexDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(PrimaryIndexInfo primaryIndexInfo,
IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
- primaryIndexInfo.fileSplitProvider);
+ primaryIndexInfo.getFileSplitProvider());
}
- public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
- ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
- Map<String, String> mergePolicyProperties, int[] filterFields,
- IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes,
- List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException {
- PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
- storageComponentProvider);
- return getPrimaryIndexDataflowHelperFactory(primaryIndexInfo, storageComponentProvider)
- .create(createTestContext(true).getJobletContext().getServiceContext(), PARTITION);
+ public IStorageManager getStorageManager() {
+ CcApplicationContext appCtx =
+ (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+ return appCtx.getStorageManager();
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
new file mode 100644
index 0000000..76bec8c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -0,0 +1,889 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Predicate;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ComponentRollbackTest {
+
+ private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+ private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+ new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+ private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+ { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+ private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+ private static final ARecordType META_TYPE = null;
+ private static final GenerationFunction[] META_GEN_FUNCTION = null;
+ private static final boolean[] UNIQUE_META_FIELDS = null;
+ private static final int[] KEY_INDEXES = { 0 };
+ private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
+ private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+ private static final int TOTAL_NUM_OF_RECORDS = 10000;
+ private static final int RECORDS_PER_COMPONENT = 1000;
+ private static final int DATASET_ID = 101;
+ private static final String DATAVERSE_NAME = "TestDV";
+ private static final String DATASET_NAME = "TestDS";
+ private static final String DATA_TYPE_NAME = "DUMMY";
+ private static final String NODE_GROUP_NAME = "DEFAULT";
+ private static final Predicate<ILSMComponent> memoryComponentsPredicate = c -> c instanceof ILSMMemoryComponent;
+ private static final StorageComponentProvider storageManager = new StorageComponentProvider();
+ private static TestNodeController nc;
+ private static TestLsmBtree lsmBtree;
+ private static NCAppRuntimeContext ncAppCtx;
+ private static IDatasetLifecycleManager dsLifecycleMgr;
+ private static Dataset dataset;
+ private static IHyracksTaskContext ctx;
+ private static IIndexDataflowHelper indexDataflowHelper;
+ private static ITransactionContext txnCtx;
+ private static LSMInsertDeleteOperatorNodePushable insertOp;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ nc = new TestNodeController(null, false);
+ nc.init();
+ ncAppCtx = nc.getAppRuntimeContext();
+ dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ System.out.println("TearDown");
+ nc.deInit();
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @Before
+ public void createIndex() throws Exception {
+ List<List<String>> partitioningKeys = new ArrayList<>();
+ partitioningKeys.add(Collections.singletonList("key"));
+ dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+ NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ partitioningKeys, null, null, null, false, null, false),
+ null, DatasetType.INTERNAL, DATASET_ID, 0);
+ PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+ new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+ IndexDataflowHelperFactory iHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+ ctx = nc.createTestContext(false);
+ indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
+ indexDataflowHelper.open();
+ lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
+ indexDataflowHelper.close();
+ nc.newJobId();
+ txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+ insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
+ null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+ }
+
+ @After
+ public void destroyIndex() throws Exception {
+ indexDataflowHelper.destroy();
+ }
+
+ private void allowAllOps(TestLsmBtree lsmBtree) {
+ lsmBtree.addModifyCallback(sem -> sem.release());
+ lsmBtree.addFlushCallback(sem -> sem.release());
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.addMergeCallback(sem -> sem.release());
+ }
+
+ @Test
+ public void testRollbackWhileNoOp() {
+ try {
+ // allow all operations
+ allowAllOps(lsmBtree);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ ILSMIndexAccessor lsmAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // rollback a memory component
+ lsmAccessor.deleteComponents(memoryComponentsPredicate);
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+ // rollback the last disk component
+ lsmAccessor = lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+ lsmAccessor.deleteComponents(pred);
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackThenInsert() {
+ try {
+ // allow all operations
+ allowAllOps(lsmBtree);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ ILSMIndexAccessor lsmAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // rollback a memory component
+ lsmAccessor.deleteComponents(memoryComponentsPredicate);
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+
+ // insert again
+ nc.newJobId();
+ txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+ insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
+ null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+ insertOp.open();
+ for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // rollback the last disk component
+ lsmAccessor = lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+ lsmAccessor.deleteComponents(pred);
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileSearch() {
+ try {
+ // allow all operations but search
+ allowAllOps(lsmBtree);
+ lsmBtree.clearSearchCallbacks();
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ // wait till firstSearcher enter the components
+ firstSearcher.waitUntilEntered();
+ // now that we enetered, we will rollback
+ ILSMIndexAccessor lsmAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // rollback a memory component
+ lsmAccessor.deleteComponents(
+ c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
+ // now that the rollback has completed, we will unblock the search
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(firstSearcher.result());
+ // search now and ensure
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+ // rollback the last disk component
+ // re-block searches
+ lsmBtree.clearSearchCallbacks();
+ Searcher secondSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree,
+ TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+ // wait till firstSearcher enter the components
+ secondSearcher.waitUntilEntered();
+ lsmAccessor = lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+ lsmAccessor.deleteComponents(pred);
+ // now that the rollback has completed, we will unblock the search
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(secondSearcher.result());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileFlush() {
+ try {
+ // allow all operations
+ allowAllOps(lsmBtree);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // disable flushes
+ lsmBtree.clearFlushCallbacks();
+ Flusher firstFlusher = new Flusher(lsmBtree);
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+ firstFlusher.waitUntilCount(1);
+ // now that we enetered, we will rollback. This will not proceed since it is waiting for the flush to complete
+ Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
+ // now that the rollback has completed, we will search
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ //unblock the flush
+ lsmBtree.allowFlush(1);
+ // ensure rollback completed
+ rollerback.complete();
+ // ensure current mem component is not modified
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ // search now and ensure
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileMerge() {
+ try {
+ // allow all operations but merge
+ allowAllOps(lsmBtree);
+ lsmBtree.clearMergeCallbacks();
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // Now, we will start a full merge
+ Merger merger = new Merger(lsmBtree);
+ ILSMIndexAccessor mergeAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // select the components to merge... the last three
+ int numMergedComponents = 3;
+ List<ILSMDiskComponent> mergedComponents = new ArrayList<>();
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ for (int i = 0; i < numMergedComponents; i++) {
+ mergedComponents.add(diskComponents.get(i));
+ }
+ mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+ merger.waitUntilCount(1);
+ // now that we enetered, we will rollback
+ Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
+ // rollback is now waiting for the merge to complete
+ // we will search
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ //unblock the merge
+ lsmBtree.allowMerge(1);
+ // ensure rollback completes
+ rollerback.complete();
+ // ensure current mem component is not modified
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ // search now and ensure that we rolled back the merged component
+ searchAndAssertCount(nc, ctx, dataset, storageManager,
+ TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileFlushAndSearchFlushExistsFirst() {
+ try {
+ // allow all operations
+ allowAllOps(lsmBtree);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // disable flushes
+ // disable searches
+ lsmBtree.clearFlushCallbacks();
+ lsmBtree.clearSearchCallbacks();
+ Flusher firstFlusher = new Flusher(lsmBtree);
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+ firstFlusher.waitUntilCount(1);
+ Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ // wait till firstSearcher enter the components
+ firstSearcher.waitUntilEntered();
+ // now that we enetered, we will rollback rollback a memory component
+ Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
+ //unblock the flush
+ lsmBtree.allowFlush(1);
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(firstSearcher.result());
+ // ensure current mem component is not modified
+ rollerback.complete();
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ // search now and ensure the rollback was no op since it waits for ongoing flushes
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileFlushAndSearchSearchExistsFirst() {
+ try {
+ // allow all operations
+ allowAllOps(lsmBtree);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // disable flushes
+ // disable searches
+ lsmBtree.clearFlushCallbacks();
+ Flusher firstFlusher = new Flusher(lsmBtree);
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+ firstFlusher.waitUntilCount(1);
+ lsmBtree.clearSearchCallbacks();
+ Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ // wait till firstSearcher enter the components
+ firstSearcher.waitUntilEntered();
+ // now that we enetered, we will rollback
+ Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
+ // The rollback will be waiting for the flush to complete
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(firstSearcher.result());
+ //unblock the flush
+ lsmBtree.allowFlush(1);
+ // ensure current mem component is not modified
+ rollerback.complete();
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ // search now and ensure
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileMergeAndSearchMergeExitsFirst() {
+ try {
+ // allow all operations except merge
+ allowAllOps(lsmBtree);
+ lsmBtree.clearMergeCallbacks();
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // Now, we will start a merge
+ Merger merger = new Merger(lsmBtree);
+ ILSMIndexAccessor mergeAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // select the components to merge... the last three
+ int numMergedComponents = 3;
+ List<ILSMDiskComponent> mergedComponents = new ArrayList<>();
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ for (int i = 0; i < numMergedComponents; i++) {
+ mergedComponents.add(diskComponents.get(i));
+ }
+ mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+ merger.waitUntilCount(1);
+ // we will block search
+ lsmBtree.clearSearchCallbacks();
+ Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ // wait till firstSearcher enter the components
+ firstSearcher.waitUntilEntered();
+ // now that we enetered, we will rollback
+ Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
+ // the rollback is waiting for all flushes and merges to complete before it proceeds
+ // unblock the merge
+ lsmBtree.allowMerge(1);
+ // unblock the search
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(firstSearcher.result());
+ rollerback.complete();
+ // now that the rollback has completed, we will search
+ searchAndAssertCount(nc, ctx, dataset, storageManager,
+ TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+ // ensure current mem component is not modified
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileMergeAndSearchSearchExitsFirst() {
+ try {
+ // allow all operations except merge
+ allowAllOps(lsmBtree);
+ lsmBtree.clearMergeCallbacks();
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // Now, we will start a merge
+ Merger merger = new Merger(lsmBtree);
+ ILSMIndexAccessor mergeAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // select the components to merge... the last three
+ List<ILSMDiskComponent> mergedComponents = new ArrayList<>();
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ int numMergedComponents = 3;
+ for (int i = 0; i < numMergedComponents; i++) {
+ mergedComponents.add(diskComponents.get(i));
+ }
+ mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+ merger.waitUntilCount(1);
+ // we will block search
+ lsmBtree.clearSearchCallbacks();
+ Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ // wait till firstSearcher enter the components
+ firstSearcher.waitUntilEntered();
+ // now that we enetered, we will rollback
+ Rollerback rollerBack = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
+ // unblock the search
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(firstSearcher.result());
+ // even though rollback has been called, it is still waiting for the merge to complete
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ //unblock the merge
+ lsmBtree.allowMerge(1);
+ rollerBack.complete();
+ searchAndAssertCount(nc, ctx, dataset, storageManager,
+ TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+ // ensure current mem component is not modified
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private class Rollerback {
+ private Thread task;
+ private Exception failure;
+
+ public Rollerback(TestLsmBtree lsmBtree, Predicate<ILSMComponent> predicate) {
+ // now that we enetered, we will rollback
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ ILSMIndexAccessor lsmAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ try {
+ lsmAccessor.deleteComponents(predicate);
+ } catch (HyracksDataException e) {
+ failure = e;
+ }
+ }
+ };
+ task = new Thread(runnable);
+ task.start();
+ }
+
+ void complete() throws Exception {
+ task.join();
+ if (failure != null) {
+ throw failure;
+ }
+ }
+ }
+
+ private class Searcher {
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+ private Future<Boolean> task;
+ private volatile boolean entered = false;
+
+ public Searcher(TestNodeController nc, IHyracksTaskContext ctx, Dataset dataset,
+ StorageComponentProvider storageManager, TestLsmBtree lsmBtree, int numOfRecords) {
+ lsmBtree.addSearchCallback(sem -> {
+ synchronized (Searcher.this) {
+ entered = true;
+ Searcher.this.notifyAll();
+ }
+ });
+ Callable<Boolean> callable = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ searchAndAssertCount(nc, ctx, dataset, storageManager, numOfRecords);
+ return true;
+ }
+ };
+ task = executor.submit(callable);
+ }
+
+ boolean result() throws Exception {
+ return task.get();
+ }
+
+ synchronized void waitUntilEntered() throws InterruptedException {
+ while (!entered) {
+ this.wait();
+ }
+ }
+ }
+
+ private class Merger {
+ private volatile int count = 0;
+
+ public Merger(TestLsmBtree lsmBtree) {
+ lsmBtree.addMergeCallback(sem -> {
+ synchronized (Merger.this) {
+ count++;
+ Merger.this.notifyAll();
+ }
+ });
+ }
+
+ synchronized void waitUntilCount(int count) throws InterruptedException {
+ while (this.count != count) {
+ this.wait();
+ }
+ }
+ }
+
+ private class Flusher {
+ private volatile int count = 0;
+
+ public Flusher(TestLsmBtree lsmBtree) {
+ lsmBtree.addFlushCallback(sem -> {
+ synchronized (Flusher.this) {
+ count++;
+ Flusher.this.notifyAll();
+ }
+ });
+ }
+
+ synchronized void waitUntilCount(int count) throws InterruptedException {
+ while (this.count != count) {
+ this.wait();
+ }
+ }
+ }
+
+ private class DiskComponentLsnPredicate implements Predicate<ILSMComponent> {
+ private final long lsn;
+
+ public DiskComponentLsnPredicate(long lsn) {
+ this.lsn = lsn;
+ }
+
+ @Override
+ public boolean test(ILSMComponent c) {
+ try {
+ return c instanceof ILSMMemoryComponent
+ || (c instanceof ILSMDiskComponent && AbstractLSMIOOperationCallback
+ .getTreeIndexLSN(((ILSMDiskComponent) c).getMetadata()) >= lsn);
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }
+
+ private void searchAndAssertCount(TestNodeController nc, IHyracksTaskContext ctx, Dataset dataset,
+ StorageComponentProvider storageManager, int numOfRecords)
+ throws HyracksDataException, AlgebricksException {
+ nc.newJobId();
+ TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+ Collections.emptyList(), Collections.emptyList(), false);
+ IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+ new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
+ emptyTupleOp.open();
+ emptyTupleOp.close();
+ Assert.assertEquals(numOfRecords, countOp.getCount());
+ }
+
+ public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
+ Collection<FrameWriterOperation> exceptionThrowingOperations,
+ Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
+ CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
+ exceptionThrowingOperations, errorThrowingOperations);
+ return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
+ closeAnswer, deepCopyInputFrames);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 1e3960f..a0e3aa9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -25,6 +25,7 @@
import java.util.List;
import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
import org.apache.asterix.app.data.gen.TupleGenerator;
import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
@@ -58,9 +59,10 @@
import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
-import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
+import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -113,8 +115,8 @@
partitioningKeys, null, null, null, false, null, false),
null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
- null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+ PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+ new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
IHyracksTaskContext ctx = nc.createTestContext(true);
nc.newJobId();
ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
@@ -147,13 +149,14 @@
}
insertOp.close();
nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
- IIndexDataflowHelper dataflowHelper = nc.getPrimaryIndexDataflowHelper(dataset, KEY_TYPES, RECORD_TYPE,
- META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES,
- KEY_INDICATORS_LIST);
+ IndexDataflowHelperFactory iHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), indexInfo.getFileSplitProvider());
+ IIndexDataflowHelper dataflowHelper =
+ iHelperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
dataflowHelper.open();
LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance();
LongPointable longPointable = LongPointable.FACTORY.createPointable();
- ComponentMetadataUtil.get(btree, ComponentMetadataUtil.MARKER_LSN_KEY, longPointable);
+ ComponentUtils.get(btree, ComponentUtils.MARKER_LSN_KEY, longPointable);
long lsn = longPointable.getLong();
int numOfMarkers = 0;
LogReader logReader = (LogReader) nc.getTransactionSubsystem().getLogManager().getLogReader(false);
@@ -204,4 +207,4 @@
return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
closeAnswer, deepCopyInputFrames);
}
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
new file mode 100644
index 0000000..893b428
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.metadata.IDatasetDetails;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.common.IResourceFactory;
+
+public class TestDataset extends Dataset {
+
+ private static final long serialVersionUID = 1L;
+
+ public TestDataset(String dataverseName, String datasetName, String recordTypeDataverseName, String recordTypeName,
+ String nodeGroupName, String compactionPolicy, Map<String, String> compactionPolicyProperties,
+ IDatasetDetails datasetDetails, Map<String, String> hints, DatasetType datasetType, int datasetId,
+ int pendingOp) {
+ super(dataverseName, datasetName, recordTypeDataverseName, recordTypeName, nodeGroupName, compactionPolicy,
+ compactionPolicyProperties, datasetDetails, hints, datasetType, datasetId, pendingOp);
+ }
+
+ @Override
+ public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Index index, ARecordType recordType,
+ ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties)
+ throws AlgebricksException {
+ ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(this, recordType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(this,
+ recordType, mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
+ IResourceFactory resourceFactory =
+ TestLsmBTreeResourceFactoryProvider.INSTANCE.getResourceFactory(mdProvider, this, index, recordType,
+ metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories);
+ return new DatasetLocalResourceFactory(getDatasetId(), resourceFactory);
+ }
+
+ @Override
+ public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index index) throws AlgebricksException {
+ return TestLsmBtreeIoOpCallbackFactory.INSTANCE;
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java
new file mode 100644
index 0000000..4511f42
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.external.indexing.FilesIndexDescription;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.metadata.api.IResourceFactoryProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.IndexUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtreeLocalResourceFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.storage.common.IStorageManager;
+
+public class TestLsmBTreeResourceFactoryProvider implements IResourceFactoryProvider {
+
+ public static final TestLsmBTreeResourceFactoryProvider INSTANCE = new TestLsmBTreeResourceFactoryProvider();
+
+ private TestLsmBTreeResourceFactoryProvider() {
+ }
+
+ @Override
+ public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Dataset dataset, Index index,
+ ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits,
+ IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException {
+ int[] filterFields = IndexUtil.getFilterFields(dataset, index, filterTypeTraits);
+ int[] btreeFields = IndexUtil.getBtreeFieldsIfFiltered(dataset, index);
+ IStorageComponentProvider storageComponentProvider = mdProvider.getStorageComponentProvider();
+ ITypeTraits[] typeTraits = getTypeTraits(mdProvider, dataset, index, recordType, metaType);
+ IBinaryComparatorFactory[] cmpFactories = getCmpFactories(mdProvider, dataset, index, recordType, metaType);
+ int[] bloomFilterFields = getBloomFilterFields(dataset, index);
+ boolean durable = !dataset.isTemp();
+ double bloomFilterFalsePositiveRate = mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate();
+ ILSMOperationTrackerFactory opTrackerFactory = dataset.getIndexOperationTrackerFactory(index);
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory = dataset.getIoOperationCallbackFactory(index);
+ IStorageManager storageManager = storageComponentProvider.getStorageManager();
+ IMetadataPageManagerFactory metadataPageManagerFactory =
+ storageComponentProvider.getMetadataPageManagerFactory();
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider =
+ storageComponentProvider.getIoOperationSchedulerProvider();
+ AsterixVirtualBufferCacheProvider vbcProvider = new AsterixVirtualBufferCacheProvider(dataset.getDatasetId());
+ return new TestLsmBtreeLocalResourceFactory(storageManager, typeTraits, cmpFactories, filterTypeTraits,
+ filterCmpFactories, filterFields, opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory,
+ vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable, bloomFilterFields,
+ bloomFilterFalsePositiveRate, index.isPrimaryIndex(), btreeFields);
+ }
+
+ private static ITypeTraits[] getTypeTraits(MetadataProvider metadataProvider, Dataset dataset, Index index,
+ ARecordType recordType, ARecordType metaType) throws AlgebricksException {
+ ITypeTraits[] primaryTypeTraits = dataset.getPrimaryTypeTraits(metadataProvider, recordType, metaType);
+ if (index.isPrimaryIndex()) {
+ return primaryTypeTraits;
+ } else if (dataset.getDatasetType() == DatasetType.EXTERNAL
+ && index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) {
+ return FilesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS;
+ }
+ int numPrimaryKeys = dataset.getPrimaryKeys().size();
+ int numSecondaryKeys = index.getKeyFieldNames().size();
+ ITypeTraitProvider typeTraitProvider = metadataProvider.getStorageComponentProvider().getTypeTraitProvider();
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ ARecordType sourceType;
+ List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators();
+ if (keySourceIndicators == null || keySourceIndicators.get(i) == 0) {
+ sourceType = recordType;
+ } else {
+ sourceType = metaType;
+ }
+ Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(i),
+ index.getKeyFieldNames().get(i), sourceType);
+ IAType keyType = keyTypePair.first;
+ secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
+ }
+ // Add serializers and comparators for primary index fields.
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryTypeTraits[numSecondaryKeys + i] = primaryTypeTraits[i];
+ }
+ return secondaryTypeTraits;
+ }
+
+ private static IBinaryComparatorFactory[] getCmpFactories(MetadataProvider metadataProvider, Dataset dataset,
+ Index index, ARecordType recordType, ARecordType metaType) throws AlgebricksException {
+ IBinaryComparatorFactory[] primaryCmpFactories =
+ dataset.getPrimaryComparatorFactories(metadataProvider, recordType, metaType);
+ if (index.isPrimaryIndex()) {
+ return dataset.getPrimaryComparatorFactories(metadataProvider, recordType, metaType);
+ } else if (dataset.getDatasetType() == DatasetType.EXTERNAL
+ && index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) {
+ return FilesIndexDescription.FILES_INDEX_COMP_FACTORIES;
+ }
+ int numPrimaryKeys = dataset.getPrimaryKeys().size();
+ int numSecondaryKeys = index.getKeyFieldNames().size();
+ IBinaryComparatorFactoryProvider cmpFactoryProvider =
+ metadataProvider.getStorageComponentProvider().getComparatorFactoryProvider();
+ IBinaryComparatorFactory[] secondaryCmpFactories =
+ new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ ARecordType sourceType;
+ List<Integer> keySourceIndicators = index.getKeyFieldSourceIndicators();
+ if (keySourceIndicators == null || keySourceIndicators.get(i) == 0) {
+ sourceType = recordType;
+ } else {
+ sourceType = metaType;
+ }
+ Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(i),
+ index.getKeyFieldNames().get(i), sourceType);
+ IAType keyType = keyTypePair.first;
+ secondaryCmpFactories[i] = cmpFactoryProvider.getBinaryComparatorFactory(keyType, true);
+ }
+ // Add serializers and comparators for primary index fields.
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryCmpFactories[numSecondaryKeys + i] = primaryCmpFactories[i];
+ }
+ return secondaryCmpFactories;
+ }
+
+ private static int[] getBloomFilterFields(Dataset dataset, Index index) throws AlgebricksException {
+ if (index.isPrimaryIndex()) {
+ return dataset.getPrimaryBloomFilterFields();
+ } else if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ if (index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) {
+ return FilesIndexDescription.BLOOM_FILTER_FIELDS;
+ } else {
+ return new int[] { index.getKeyFieldNames().size() };
+ }
+ }
+ int numKeys = index.getKeyFieldNames().size();
+ int[] bloomFilterKeyFields = new int[numKeys];
+ for (int i = 0; i < numKeys; i++) {
+ bloomFilterKeyFields[i] = i;
+ }
+ return bloomFilterKeyFields;
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
new file mode 100644
index 0000000..1bec41e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent;
+
+public class TestLsmBtreeIoOpCallbackFactory implements ILSMIOOperationCallbackFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static TestLsmBtreeIoOpCallbackFactory INSTANCE = new TestLsmBtreeIoOpCallbackFactory();
+ private static volatile int completedFlushes = 0;
+ private static volatile int completedMerges = 0;
+ private static volatile int rollbackFlushes = 0;
+ private static volatile int rollbackMerges = 0;
+ private static volatile int failedFlushes = 0;
+ private static volatile int failedMerges = 0;
+
+ private TestLsmBtreeIoOpCallbackFactory() {
+ }
+
+ @Override
+ public synchronized ILSMIOOperationCallback createIoOpCallback() {
+ completedFlushes = 0;
+ completedMerges = 0;
+ rollbackFlushes = 0;
+ rollbackMerges = 0;
+ // Whenever this is called, it resets the counter
+ // However, the counters for the failed operations are never reset since we expect them
+ // To be always 0
+ return new TestLsmBtreeIoOpCallback();
+ }
+
+ public int getTotalFlushes() {
+ return completedFlushes + rollbackFlushes;
+ }
+
+ public int getTotalMerges() {
+ return completedMerges + rollbackMerges;
+ }
+
+ public int getTotalIoOps() {
+ return getTotalFlushes() + getTotalMerges();
+ }
+
+ public int getRollbackFlushes() {
+ return rollbackFlushes;
+ }
+
+ public int getRollbackMerges() {
+ return rollbackMerges;
+ }
+
+ public int getCompletedFlushes() {
+ return completedFlushes;
+ }
+
+ public int getCompletedMerges() {
+ return completedMerges;
+ }
+
+ public static int getFailedFlushes() {
+ return failedFlushes;
+ }
+
+ public static int getFailedMerges() {
+ return failedMerges;
+ }
+
+ public class TestLsmBtreeIoOpCallback extends LSMBTreeIOOperationCallback {
+ @Override
+ public void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) {
+ super.afterFinalize(opType, newComponent);
+ synchronized (INSTANCE) {
+ if (newComponent != null) {
+ if (newComponent == EmptyComponent.INSTANCE) {
+ if (opType == LSMOperationType.FLUSH) {
+ rollbackFlushes++;
+ } else {
+ rollbackMerges++;
+ }
+ } else {
+ if (opType == LSMOperationType.FLUSH) {
+ completedFlushes++;
+ } else {
+ completedMerges++;
+ }
+ }
+ } else {
+ recordFailure(opType);
+ }
+ INSTANCE.notifyAll();
+ }
+ }
+
+ private void recordFailure(LSMOperationType opType) {
+ if (opType == LSMOperationType.FLUSH) {
+ failedFlushes++;
+ } else {
+ failedMerges++;
+ }
+ }
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index 5bb9d49..a20e660 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -77,7 +77,7 @@
@Override
protected boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException {
- List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents());
// Reverse the components order so that we look at components from oldest to newest.
Collections.reverse(immutableComponents);
@@ -110,7 +110,7 @@
for (IndexInfo info : indexInfos) {
ILSMIndex lsmIndex = info.getIndex();
- List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getImmutableComponents());
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents());
if (isMergeOngoing(immutableComponents)) {
continue;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index f357aea..ba0f928 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -34,7 +34,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId;
-import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
+import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
// A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
@@ -100,9 +100,9 @@
}
}
- public void putLSNIntoMetadata(ILSMDiskComponent index, List<ILSMComponent> oldComponents)
+ public void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<ILSMComponent> oldComponents)
throws HyracksDataException {
- index.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
+ newComponent.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
}
public static long getTreeIndexLSN(DiskComponentMetadata md) throws HyracksDataException {
@@ -188,10 +188,11 @@
putLSNIntoMetadata(newComponent, oldComponents);
putComponentIdIntoMetadata(newComponent, oldComponents);
if (opType == LSMOperationType.MERGE) {
- LongPointable markerLsn = LongPointable.FACTORY
- .createPointable(ComponentMetadataUtil.getLong(oldComponents.get(0).getMetadata(),
- ComponentMetadataUtil.MARKER_LSN_KEY, ComponentMetadataUtil.NOT_FOUND));
- newComponent.getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY, markerLsn);
+ // In case of merge, oldComponents are never null
+ LongPointable markerLsn =
+ LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
+ ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
+ newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
index bbe2c4f..abe474b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
@@ -26,7 +26,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
+import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
/**
* A basic callback used to write marker to transaction logs
@@ -52,17 +52,17 @@
private long getLsn() {
long lsn;
try {
- lsn = ComponentMetadataUtil.getLong(index.getCurrentMemoryComponent().getMetadata(),
- ComponentMetadataUtil.MARKER_LSN_KEY, ComponentMetadataUtil.NOT_FOUND);
+ lsn = ComponentUtils.getLong(index.getCurrentMemoryComponent().getMetadata(), ComponentUtils.MARKER_LSN_KEY,
+ ComponentUtils.NOT_FOUND);
} catch (HyracksDataException e) {
// Should never happen since this is a memory component
throw new IllegalStateException(e);
}
- if (lsn == ComponentMetadataUtil.NOT_FOUND) {
+ if (lsn == ComponentUtils.NOT_FOUND) {
synchronized (index.getOperationTracker()) {
// look for it in previous memory component if exists
lsn = lsnFromImmutableMemoryComponents();
- if (lsn == ComponentMetadataUtil.NOT_FOUND) {
+ if (lsn == ComponentUtils.NOT_FOUND) {
// look for it in disk component
lsn = lsnFromDiskComponents();
}
@@ -72,26 +72,26 @@
}
private long lsnFromDiskComponents() {
- List<ILSMDiskComponent> diskComponents = index.getImmutableComponents();
+ List<ILSMDiskComponent> diskComponents = index.getDiskComponents();
for (ILSMDiskComponent c : diskComponents) {
try {
- long lsn = ComponentMetadataUtil.getLong(c.getMetadata(), ComponentMetadataUtil.MARKER_LSN_KEY,
- ComponentMetadataUtil.NOT_FOUND);
- if (lsn != ComponentMetadataUtil.NOT_FOUND) {
+ long lsn = ComponentUtils.getLong(c.getMetadata(), ComponentUtils.MARKER_LSN_KEY,
+ ComponentUtils.NOT_FOUND);
+ if (lsn != ComponentUtils.NOT_FOUND) {
return lsn;
}
} catch (HyracksDataException e) {
throw new IllegalStateException("Unable to read metadata page. Disk Error?", e);
}
}
- return ComponentMetadataUtil.NOT_FOUND;
+ return ComponentUtils.NOT_FOUND;
}
private long lsnFromImmutableMemoryComponents() {
List<ILSMMemoryComponent> memComponents = index.getMemoryComponents();
int numOtherMemComponents = memComponents.size() - 1;
int next = index.getCurrentMemoryComponentIndex();
- long lsn = ComponentMetadataUtil.NOT_FOUND;
+ long lsn = ComponentUtils.NOT_FOUND;
for (int i = 0; i < numOtherMemComponents; i++) {
next = next - 1;
if (next < 0) {
@@ -100,13 +100,13 @@
ILSMMemoryComponent c = index.getMemoryComponents().get(next);
if (c.isReadable()) {
try {
- lsn = ComponentMetadataUtil.getLong(c.getMetadata(), ComponentMetadataUtil.MARKER_LSN_KEY,
- ComponentMetadataUtil.NOT_FOUND);
+ lsn = ComponentUtils.getLong(c.getMetadata(), ComponentUtils.MARKER_LSN_KEY,
+ ComponentUtils.NOT_FOUND);
} catch (HyracksDataException e) {
// Should never happen since this is a memory component
throw new IllegalStateException(e);
}
- if (lsn != ComponentMetadataUtil.NOT_FOUND) {
+ if (lsn != ComponentUtils.NOT_FOUND) {
return lsn;
}
}
@@ -117,7 +117,7 @@
@Override
public void after(long lsn) {
pointable.setLong(lsn);
- index.getCurrentMemoryComponent().getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY, pointable);
+ index.getCurrentMemoryComponent().getMetadata().put(ComponentUtils.MARKER_LSN_KEY, pointable);
}
public ILSMIndex getIndex() {
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index c18ecc2..9f071bb 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -218,7 +218,7 @@
}
ILSMIndex index = Mockito.mock(ILSMIndex.class);
- Mockito.when(index.getImmutableComponents()).thenReturn(components);
+ Mockito.when(index.getDiskComponents()).thenReturn(components);
ILSMIndexAccessor accessor = Mockito.mock(ILSMIndexAccessor.class);
Mockito.doAnswer(new Answer<Void>() {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index 2eb84fb..12f49a8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -92,7 +92,7 @@
super.open();
primaryIndexHelper.open();
primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
- diskComponents = new ILSMDiskComponent[primaryIndex.getImmutableComponents().size()];
+ diskComponents = new ILSMDiskComponent[primaryIndex.getDiskComponents().size()];
secondaryIndexHelper.open();
secondaryIndex = (ILSMIndex) secondaryIndexHelper.getIndexInstance();
@@ -221,7 +221,7 @@
}
private void activateComponents() throws HyracksDataException {
- List<ILSMDiskComponent> primaryComponents = primaryIndex.getImmutableComponents();
+ List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
for (int i = diskComponents.length - 1; i >= 0; i--) {
// start from the oldest component to the newest component
if (diskComponents[i] != null && diskComponents[i].getComponentSize() > 0) {
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 9c9fd17..75e193b 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -965,6 +965,12 @@
<type>test-jar</type>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-btree-test</artifactId>
+ <version>${hyracks.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index ef3f13b..a071345 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -129,6 +129,10 @@
public static final int JOB_HAS_NOT_BEEN_CREATED_YET = 93;
public static final int CANNOT_READ_CLOSED_FILE = 94;
public static final int TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME = 95;
+ public static final int ILLEGAL_ATTEMPT_TO_ENTER_EMPTY_COMPONENT = 96;
+ public static final int ILLEGAL_ATTEMPT_TO_EXIT_EMPTY_COMPONENT = 97;
+ public static final int A_FLUSH_OPERATION_HAS_FAILED = 98;
+ public static final int A_MERGE_OPERATION_HAS_FAILED = 99;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 40df3d8..ae579b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -112,5 +112,9 @@
93 = Job %1$s has not been created yet
94 = Cannot read closed file (%1$s)
95 = Tuple of size %1$s cannot fit into an empty frame
+96 = Illegal attempt to enter empty component
+97 = Illegal attempt to exit empty component
+98 = A flush operation has failed
+99 = A merge operation has failed
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
index 71a3f71..ff47d27 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -33,5 +33,7 @@
FULL_MERGE,
FLUSH,
REPLICATE,
- DISK_COMPONENT_SCAN
+ DISK_COMPONENT_SCAN,
+ DELETE_MEMORY_COMPONENT,
+ DELETE_DISK_COMPONENTS
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index 86d926f..3775985 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -141,7 +141,7 @@
// is needed.
// It only needs to return the newer list
@Override
- public List<ILSMDiskComponent> getImmutableComponents() {
+ public List<ILSMDiskComponent> getDiskComponents() {
if (version == 0) {
return diskComponents;
} else if (version == 1) {
@@ -195,7 +195,7 @@
LSMComponentFileReferences relMergeFileRefs =
fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName());
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory);
- ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor,
+ ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, cursor,
relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(),
callback, fileManager.getBaseDir().getAbsolutePath()));
}
@@ -376,7 +376,7 @@
// Not supported
@Override
- public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException {
throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-BTree");
}
@@ -465,7 +465,7 @@
if (isTransaction) {
// Since this is a transaction component, validate and
// deactivate. it could later be added or deleted
- markAsValid(component);
+ component.markAsValid(durable);
BTree btree = ((LSMBTreeDiskComponent) component).getBTree();
BloomFilter bloomFilter = ((LSMBTreeDiskComponent) component).getBloomFilter();
btree.deactivate();
@@ -506,11 +506,6 @@
}
}
- @Override
- public String toString() {
- return "LSMTwoPCBTree [" + fileManager.getBaseDir() + "]";
- }
-
// The accessor for disk only indexes don't use modification callback and always carry the target index version with them
@Override
public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index 7462c7a..ff17905 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -325,7 +325,7 @@
}
@Override
- public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException {
throw HyracksDataException.create(ErrorCode.FLUSH_NOT_SUPPORTED_IN_EXTERNAL_INDEX);
}
@@ -364,10 +364,10 @@
.get(secondDiskComponents.size() - 1);
}
- ioScheduler.scheduleOperation(new LSMBTreeWithBuddyMergeOperation(accessor, mergingComponents, cursor,
- relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
- relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(),
- keepDeleteTuples));
+ ioScheduler.scheduleOperation(
+ new LSMBTreeWithBuddyMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(),
+ relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(),
+ callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples));
}
// This method creates the appropriate opContext for the targeted version
@@ -378,12 +378,11 @@
}
@Override
- public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException {
LSMBTreeWithBuddyMergeOperation mergeOp = (LSMBTreeWithBuddyMergeOperation) operation;
IIndexCursor cursor = mergeOp.getCursor();
ISearchPredicate btreeSearchPred = new RangePredicate(null, null, true, true, null, null);
ILSMIndexOperationContext opCtx = ((LSMBTreeWithBuddySortedCursor) cursor).getOpCtx();
- opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
search(opCtx, cursor, btreeSearchPred);
LSMBTreeWithBuddyDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(),
@@ -477,15 +476,6 @@
}
}
- @Override
- public void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException {
- LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) lsmComponent;
- // Flush the bloom filter first.
- markAsValidInternal(component.getBTree().getBufferCache(), component.getBloomFilter());
- markAsValidInternal(component.getBTree());
- markAsValidInternal(component.getBuddyBTree());
- }
-
// This function is used when a new component is to be committed -- is
// called by the harness.
@Override
@@ -629,7 +619,7 @@
if (isTransaction) {
// Since this is a transaction component, validate and
// deactivate. it could later be added or deleted
- markAsValid(component);
+ component.markAsValid(durable);
BTree btree = ((LSMBTreeWithBuddyDiskComponent) component).getBTree();
BTree buddyBtree = ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree();
BloomFilter bloomFilter = ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter();
@@ -816,15 +806,14 @@
@Override
protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx,
- ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs,
- ILSMIOOperationCallback callback) throws HyracksDataException {
+ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
return null;
}
@Override
protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
- List<ILSMComponent> mergingComponents, LSMComponentFileReferences mergeFileRefs,
- ILSMIOOperationCallback callback) throws HyracksDataException {
+ LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException {
return null;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 1920592..a0d5a22 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -64,7 +64,6 @@
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFilterManager;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor.ICursorFactory;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
@@ -295,7 +294,7 @@
}
@Override
- public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException {
LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation;
LSMBTreeMemoryComponent flushingComponent = (LSMBTreeMemoryComponent) flushOp.getFlushingComponent();
IIndexAccessor accessor = flushingComponent.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
@@ -361,15 +360,12 @@
}
@Override
- public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException {
LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
IIndexCursor cursor = mergeOp.getCursor();
RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
- ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) cursor).getOpCtx();
- opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
- search(opCtx, cursor, rangePred);
+ search(mergeOp.getAccessor().getOpContext(), cursor, rangePred);
List<ILSMComponent> mergedComponents = mergeOp.getMergingComponents();
-
long numElements = 0L;
if (hasBloomFilter) {
//count elements in btree for creating Bloomfilter
@@ -400,9 +396,7 @@
getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getBTree());
}
-
componentBulkLoader.end();
-
return mergedComponent;
}
@@ -464,22 +458,10 @@
}
@Override
- public void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException {
- // The order of forcing the dirty page to be flushed is critical. The
- // bloom filter must be always done first.
- LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) lsmComponent;
- if (hasBloomFilter) {
- markAsValidInternal(component.getBTree().getBufferCache(), component.getBloomFilter());
- }
- markAsValidInternal(component.getBTree());
- }
-
- @Override
protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx,
- ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs,
- ILSMIOOperationCallback callback) {
+ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) {
ILSMIndexAccessor accessor = createAccessor(opCtx);
- return new LSMBTreeFlushOperation(accessor, flushingComponent, componentFileRefs.getInsertIndexFileReference(),
+ return new LSMBTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(),
componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath());
}
@@ -552,11 +534,6 @@
}
@Override
- public String toString() {
- return "LSMBTree [" + fileManager.getBaseDir() + "]";
- }
-
- @Override
public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) {
Set<String> files = new HashSet<>();
LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) lsmComponent;
@@ -625,16 +602,15 @@
@Override
protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
- List<ILSMComponent> mergingComponents, LSMComponentFileReferences mergeFileRefs,
- ILSMIOOperationCallback callback) {
+ LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) {
boolean returnDeletedTuples = false;
ILSMIndexAccessor accessor = createAccessor(opCtx);
+ List<ILSMComponent> mergingComponents = opCtx.getComponentHolder();
if (mergingComponents.get(mergingComponents.size() - 1) != diskComponents.get(diskComponents.size() - 1)) {
returnDeletedTuples = true;
}
ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples);
- return new LSMBTreeMergeOperation(accessor, mergingComponents, cursor,
- mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback,
- fileManager.getBaseDir().getAbsolutePath());
+ return new LSMBTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(),
+ mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index 33bb60e..4d60ab4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent {
private final BTree btree;
@@ -68,4 +69,14 @@
public String toString() {
return getClass().getSimpleName() + ":" + btree.getFileReference().getRelativePath();
}
+
+ @Override
+ public void markAsValid(boolean persist) throws HyracksDataException {
+ // The order of forcing the dirty page to be flushed is critical.
+ // The bloom filter must be always done first.
+ if (bloomFilter != null && persist) {
+ ComponentUtils.markAsValid(btree.getBufferCache(), bloomFilter, persist);
+ }
+ ComponentUtils.markAsValid(btree, persist);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index 4a06778..e3424e5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -21,16 +21,14 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
public class LSMBTreeFlushOperation extends FlushOperation {
private final FileReference bloomFilterFlushTarget;
- public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, ILSMMemoryComponent flushingComponent,
- FileReference flushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback,
- String indexIdentifier) {
- super(accessor, flushingComponent, flushTarget, callback, indexIdentifier);
+ public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget,
+ FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
+ super(accessor, flushTarget, callback, indexIdentifier);
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index 0cc76f2..ec96303 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -19,11 +19,8 @@
package org.apache.hyracks.storage.am.lsm.btree.impls;
-import java.util.List;
-
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
@@ -32,10 +29,9 @@
private final FileReference bloomFilterMergeTarget;
- public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, List<ILSMComponent> mergingComponents,
- ITreeIndexCursor cursor, FileReference target, FileReference bloomFilterMergeTarget,
- ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, target, callback, indexIdentifier, mergingComponents, cursor);
+ public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target,
+ FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
+ super(accessor, target, callback, indexIdentifier, cursor);
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
index 0ba7c30..4aee950 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
public class LSMBTreeWithBuddyDiskComponent extends AbstractLSMDiskComponent {
@@ -78,4 +79,11 @@
public String toString() {
return getClass().getSimpleName() + ":" + btree.getFileReference().getRelativePath();
}
+
+ @Override
+ public void markAsValid(boolean persist) throws HyracksDataException {
+ ComponentUtils.markAsValid(btree.getBufferCache(), bloomFilter, persist);
+ ComponentUtils.markAsValid(btree, persist);
+ ComponentUtils.markAsValid(buddyBtree, persist);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
index 2817f3a..f682bde 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java
@@ -18,11 +18,8 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.impls;
-import java.util.List;
-
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
@@ -33,11 +30,10 @@
private final FileReference bloomFilterMergeTarget;
private final boolean keepDeletedTuples;
- public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, List<ILSMComponent> mergingComponents,
- ITreeIndexCursor cursor, FileReference target, FileReference buddyBtreeMergeTarget,
- FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier,
- boolean keepDeletedTuples) {
- super(accessor, target, callback, indexIdentifier, mergingComponents, cursor);
+ public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target,
+ FileReference buddyBtreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback,
+ String indexIdentifier, boolean keepDeletedTuples) {
+ super(accessor, target, callback, indexIdentifier, cursor);
this.buddyBtreeMergeTarget = buddyBtreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
this.keepDeletedTuples = keepDeletedTuples;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index aed641d..b101315 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
public interface ILSMDiskComponent extends ILSMComponent {
+
@Override
default LSMComponentType getType() {
return LSMComponentType.DISK;
@@ -49,9 +50,19 @@
/**
* Return the component Id of this disk component from its metadata
+ *
* @return
* @throws HyracksDataException
*/
ILSMDiskComponentId getComponentId() throws HyracksDataException;
+ /**
+ * Mark the component as valid
+ *
+ * @param persist
+ * whether the call should force data to disk before returning
+ * @throws HyracksDataException
+ */
+ void markAsValid(boolean persist) throws HyracksDataException;
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index c0a3f2d..89c8cb9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.lsm.common.api;
import java.util.List;
+import java.util.function.Predicate;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
@@ -242,4 +243,14 @@
*/
void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException;
+
+ /**
+ * Rollback components that match the passed predicate
+ *
+ * @param ctx
+ * @param predicate
+ * @throws HyracksDataException
+ */
+ void deleteComponents(ILSMIndexOperationContext ctx, Predicate<ILSMComponent> predicate)
+ throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index a13f82c..c2ae786 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -58,9 +58,12 @@
Boolean call() throws HyracksDataException;
/**
- * The target of the io operation
- *
- * @return
+ * @return The target of the io operation
*/
FileReference getTarget();
+
+ /**
+ * @return the accessor of the operation
+ */
+ ILSMIndexAccessor getAccessor();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index ae2c93d..addeb27 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -57,7 +57,7 @@
/**
* components with lower indexes are newer than components with higher index
*/
- List<ILSMDiskComponent> getImmutableComponents();
+ List<ILSMDiskComponent> getDiskComponents();
boolean isPrimaryIndex();
@@ -99,15 +99,6 @@
void addInactiveDiskComponent(ILSMDiskComponent diskComponent);
- /**
- * Persist the LSM component
- *
- * @param lsmComponent
- * , the component to be persistent
- * @throws HyracksDataException
- */
- void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException;
-
boolean isCurrentMutableComponentEmpty() throws HyracksDataException;
void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents, boolean bulkload,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 1042df2..b8d64af 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.lsm.common.api;
import java.util.List;
+import java.util.function.Predicate;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
@@ -36,6 +37,11 @@
public interface ILSMIndexAccessor extends IIndexAccessor {
/**
+ * @return the operation context associated with the accessor
+ */
+ ILSMIndexOperationContext getOpContext();
+
+ /**
* Schedule a flush operation
*
* @param callback
@@ -245,4 +251,13 @@
* If the BufferCache throws while un/pinning or un/latching.
*/
void scanDiskComponents(IIndexCursor cursor) throws HyracksDataException;
+
+ /**
+ * Delete components that match the passed predicate
+ * NOTE: This call can only be made when the caller knows that data modification has been stopped
+ *
+ * @param filter
+ * @throws HyracksDataException
+ */
+ void deleteComponents(Predicate<ILSMComponent> predicate) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
index 6e51f83..aee46f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
@@ -54,6 +54,7 @@
return target;
}
+ @Override
public ILSMIndexAccessor getAccessor() {
return accessor;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 64b8fec..4386d52 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
-import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
+import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent {
@@ -103,9 +103,9 @@
@Override
public ILSMDiskComponentId getComponentId() throws HyracksDataException {
- long minID = ComponentMetadataUtil.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
+ long minID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
ILSMDiskComponentId.NOT_FOUND);
- long maxID = ComponentMetadataUtil.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
+ long maxID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
ILSMDiskComponentId.NOT_FOUND);
//TODO: do we need to throw an exception when ID is not found?
return new LSMDiskComponentId(minID, maxID);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index fe6d20f..c471cfb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -35,8 +35,6 @@
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
-import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.impls.AbstractSearchPredicate;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -191,15 +189,14 @@
throw HyracksDataException.create(ErrorCode.CANNOT_DEACTIVATE_INACTIVE_INDEX);
}
if (flush) {
- flushMemoryComponents();
+ flushMemoryComponent();
}
deactivateDiskComponents();
deactivateMemoryComponents();
isActive = false;
}
- // What if more than one memory component needs flushing??
- protected void flushMemoryComponents() throws HyracksDataException {
+ protected void flushMemoryComponent() throws HyracksDataException {
BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
accessor.scheduleFlush(cb);
@@ -278,6 +275,7 @@
case UPDATE:
case PHYSICALDELETE:
case FLUSH:
+ case DELETE_MEMORY_COMPONENT:
case DELETE:
case UPSERT:
operationalComponents.add(memoryComponents.get(cmc));
@@ -305,6 +303,7 @@
break;
case MERGE:
+ case DELETE_DISK_COMPONENTS:
operationalComponents.addAll(ctx.getComponentsToBeMerged());
break;
case FULL_MERGE:
@@ -329,28 +328,28 @@
@Override
public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
- ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
AbstractLSMIndexOperationContext opCtx =
createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- opCtx.setOperation(IndexOperation.FLUSH);
- opCtx.getComponentHolder().add(flushingComponent);
- ILSMIOOperation flushOp = createFlushOperation(opCtx, flushingComponent, componentFileRefs, callback);
+ opCtx.setOperation(ctx.getOperation());
+ opCtx.getComponentHolder().addAll(ctx.getComponentHolder());
+ ILSMIOOperation flushOp = createFlushOperation(opCtx, componentFileRefs, callback);
ioScheduler.scheduleOperation(TracedIOOperation.wrap(flushOp, tracer));
}
@Override
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
+ List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
// merge must create a different op ctx
AbstractLSMIndexOperationContext opCtx =
createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- opCtx.setOperation(IndexOperation.MERGE);
- List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
+ opCtx.setOperation(ctx.getOperation());
+ opCtx.getComponentHolder().addAll(mergingComponents);
ILSMDiskComponent firstComponent = (ILSMDiskComponent) mergingComponents.get(0);
ILSMDiskComponent lastComponent = (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1);
LSMComponentFileReferences mergeFileRefs = getMergeFileReferences(firstComponent, lastComponent);
- ILSMIOOperation mergeOp = createMergeOperation(opCtx, mergingComponents, mergeFileRefs, callback);
+ ILSMIOOperation mergeOp = createMergeOperation(opCtx, mergeFileRefs, callback);
ioScheduler.scheduleOperation(TracedIOOperation.wrap(mergeOp, tracer));
}
@@ -391,28 +390,11 @@
memoryComponentsAllocated = true;
}
- protected void markAsValidInternal(ITreeIndex treeIndex) throws HyracksDataException {
- int fileId = treeIndex.getFileId();
- IBufferCache bufferCache = treeIndex.getBufferCache();
- treeIndex.getPageManager().close();
- // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
- // won't be flushed to disk because it won't be dirty until the write latch has been released.
- // Force modified metadata page to disk.
- // If the index is not durable, then the flush is not necessary.
- if (durable) {
- bufferCache.force(fileId, true);
- }
- }
-
- protected void markAsValidInternal(IBufferCache bufferCache, BloomFilter filter) throws HyracksDataException {
- if (durable) {
- bufferCache.force(filter.getFileId(), true);
- }
- }
-
@Override
public void addDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
- diskComponents.add(0, c);
+ if (c != EmptyComponent.INSTANCE) {
+ diskComponents.add(0, c);
+ }
}
@Override
@@ -420,7 +402,9 @@
throws HyracksDataException {
int swapIndex = diskComponents.indexOf(mergedComponents.get(0));
diskComponents.removeAll(mergedComponents);
- diskComponents.add(swapIndex, newComponent);
+ if (newComponent != EmptyComponent.INSTANCE) {
+ diskComponents.add(swapIndex, newComponent);
+ }
}
@Override
@@ -430,7 +414,7 @@
}
@Override
- public List<ILSMDiskComponent> getImmutableComponents() {
+ public List<ILSMDiskComponent> getDiskComponents() {
return diskComponents;
}
@@ -477,8 +461,10 @@
}
@Override
- public String toString() {
- return "LSMIndex [" + fileManager.getBaseDir() + "]";
+ public final String toString() {
+ return "{\"class\" : \"" + getClass().getSimpleName() + "\", \"dir\" : \"" + fileManager.getBaseDir()
+ + "\", \"memory\" : " + (memoryComponents == null ? 0 : memoryComponents.size()) + ", \"disk\" : "
+ + diskComponents.size() + "}";
}
@Override
@@ -626,6 +612,22 @@
return size;
}
+ @Override
+ public final ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
+ ILSMIndexAccessor accessor = operation.getAccessor();
+ ILSMIndexOperationContext opCtx = accessor.getOpContext();
+ return opCtx.getOperation() == IndexOperation.DELETE_MEMORY_COMPONENT ? EmptyComponent.INSTANCE
+ : doFlush(operation);
+ }
+
+ @Override
+ public final ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
+ ILSMIndexAccessor accessor = operation.getAccessor();
+ ILSMIndexOperationContext opCtx = accessor.getOpContext();
+ return opCtx.getOperation() == IndexOperation.DELETE_DISK_COMPONENTS ? EmptyComponent.INSTANCE
+ : doMerge(operation);
+ }
+
public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent);
protected abstract void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException;
@@ -659,11 +661,13 @@
throws HyracksDataException;
protected abstract ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx,
- ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs,
- ILSMIOOperationCallback callback) throws HyracksDataException;
+ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException;
protected abstract ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
- List<ILSMComponent> mergingComponents, LSMComponentFileReferences mergeFileRefs,
- ILSMIOOperationCallback callback) throws HyracksDataException;
+ LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException;
+
+ protected abstract ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException;
+
+ protected abstract ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index 6b9af7e..847b882 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -36,7 +36,7 @@
@Override
public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException {
- List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents();
+ List<ILSMDiskComponent> immutableComponents = index.getDiskComponents();
if (!areComponentsMergable(immutableComponents)) {
return;
@@ -84,7 +84,7 @@
* there will be no new merge either in this situation.
*/
- List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents();
+ List<ILSMDiskComponent> immutableComponents = index.getDiskComponents();
int totalImmutableComponentCount = immutableComponents.size();
// [case 1]
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
new file mode 100644
index 0000000..0134dca
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+
+public class EmptyComponent implements ILSMDiskComponent {
+ public static final EmptyComponent INSTANCE = new EmptyComponent();
+
+ private EmptyComponent() {
+ }
+
+ @Override
+ public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) throws HyracksDataException {
+ throw HyracksDataException.create(ErrorCode.ILLEGAL_ATTEMPT_TO_ENTER_EMPTY_COMPONENT);
+ }
+
+ @Override
+ public void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
+ throws HyracksDataException {
+ throw HyracksDataException.create(ErrorCode.ILLEGAL_ATTEMPT_TO_EXIT_EMPTY_COMPONENT);
+ }
+
+ @Override
+ public ComponentState getState() {
+ return ComponentState.INACTIVE;
+ }
+
+ @Override
+ public ILSMComponentFilter getLSMComponentFilter() {
+ return null;
+ }
+
+ @Override
+ public DiskComponentMetadata getMetadata() {
+ return EmptyDiskComponentMetadata.INSTANCE;
+ }
+
+ @Override
+ public long getComponentSize() {
+ return 0;
+ }
+
+ @Override
+ public int getFileReferenceCount() {
+ return 0;
+ }
+
+ @Override
+ public void destroy() throws HyracksDataException {
+ // No Op
+ }
+
+ @Override
+ public ILSMDiskComponentId getComponentId() throws HyracksDataException {
+ return null;
+ }
+
+ @Override
+ public void markAsValid(boolean persist) throws HyracksDataException {
+ // No Op
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java
new file mode 100644
index 0000000..7d1925b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class EmptyDiskComponentMetadata extends DiskComponentMetadata {
+ public static final EmptyDiskComponentMetadata INSTANCE = new EmptyDiskComponentMetadata();
+
+ private EmptyDiskComponentMetadata() {
+ super(null);
+ }
+
+ @Override
+ public void put(IValueReference key, IValueReference value) throws HyracksDataException {
+ // No op
+ }
+
+ @Override
+ public void get(IValueReference key, IPointable value) throws HyracksDataException {
+ throw new IllegalStateException("Attempt to read metadata of empty component");
+ }
+
+ @Override
+ public IValueReference get(IValueReference key) throws HyracksDataException {
+ throw new IllegalStateException("Attempt to read metadata of empty component");
+ }
+
+ @Override
+ public void put(MemoryComponentMetadata metadata) throws HyracksDataException {
+ // No op
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index e9b2058..2f65b18 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@ -236,7 +236,7 @@
try {
newComponent = lsmIndex.merge(operation);
operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
- lsmIndex.markAsValid(newComponent);
+ newComponent.markAsValid(lsmIndex.isDurable());
} finally {
exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent);
@@ -248,7 +248,7 @@
@Override
public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException {
- lsmIndex.markAsValid(c);
+ c.markAsValid(lsmIndex.isDurable());
synchronized (opTracker) {
lsmIndex.addDiskComponent(c);
if (replicationEnabled) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
index 1173aeb..7b7f950 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
@@ -26,16 +26,12 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
public class FlushOperation extends AbstractIoOperation implements Comparable<ILSMIOOperation> {
- protected final ILSMMemoryComponent flushingComponent;
-
- public FlushOperation(ILSMIndexAccessor accessor, ILSMMemoryComponent flushingComponent, FileReference target,
- ILSMIOOperationCallback callback, String indexIdentifier) {
+ public FlushOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback,
+ String indexIdentifier) {
super(accessor, target, callback, indexIdentifier);
- this.flushingComponent = flushingComponent;
}
@Override
@@ -55,7 +51,7 @@
}
public ILSMComponent getFlushingComponent() {
- return flushingComponent;
+ return accessor.getOpContext().getComponentHolder().get(0);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 1fc702c..1ef807f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -23,6 +23,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -51,6 +52,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.util.IOOperationUtils;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.util.trace.Tracer;
@@ -120,7 +122,8 @@
}
break;
case MERGE:
- if (ctx.getComponentHolder().size() < 2) {
+ if (ctx.getComponentHolder().size() < 2
+ && ctx.getOperation() != IndexOperation.DELETE_DISK_COMPONENTS) {
// There is only a single component. There is nothing to merge.
return false;
}
@@ -518,7 +521,7 @@
try {
newComponent = lsmIndex.flush(operation);
operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent);
- lsmIndex.markAsValid(newComponent);
+ newComponent.markAsValid(lsmIndex.isDurable());
} catch (Throwable e) {
failedOperation = true;
if (LOGGER.isLoggable(Level.SEVERE)) {
@@ -569,7 +572,7 @@
try {
newComponent = lsmIndex.merge(operation);
operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
- lsmIndex.markAsValid(newComponent);
+ newComponent.markAsValid(lsmIndex.isDurable());
} catch (Throwable e) {
failedOperation = true;
if (LOGGER.isLoggable(Level.SEVERE)) {
@@ -602,7 +605,7 @@
@Override
public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException {
- lsmIndex.markAsValid(c);
+ c.markAsValid(lsmIndex.isDurable());
synchronized (opTracker) {
lsmIndex.addDiskComponent(c);
if (replicationEnabled) {
@@ -753,6 +756,105 @@
}
@Override
+ public void deleteComponents(ILSMIndexOperationContext ctx, Predicate<ILSMComponent> predicate)
+ throws HyracksDataException {
+ BlockingIOOperationCallbackWrapper ioCallback =
+ new BlockingIOOperationCallbackWrapper(lsmIndex.getIOOperationCallback());
+ boolean deleteMemoryComponent;
+ synchronized (opTracker) {
+ waitForFlushesAndMerges();
+ ensureNoFailedFlush();
+ // We always start with the memory component
+ ILSMMemoryComponent memComponent = lsmIndex.getCurrentMemoryComponent();
+ deleteMemoryComponent = predicate.test(memComponent);
+ if (deleteMemoryComponent) {
+ // schedule a delete for flushed component
+ ctx.reset();
+ ctx.setOperation(IndexOperation.DELETE_MEMORY_COMPONENT);
+ // ScheduleFlush is actually a try operation
+ scheduleFlush(ctx, ioCallback);
+ }
+ }
+ // Here, we are releasing the opTracker to allow other operations:
+ // (searches, delete flush we will schedule, delete merge we will schedule).
+ if (deleteMemoryComponent) {
+ IOOperationUtils.waitForIoOperation(ioCallback);
+ }
+ ctx.reset();
+ ioCallback = new BlockingIOOperationCallbackWrapper(lsmIndex.getIOOperationCallback());
+ ctx.setOperation(IndexOperation.DELETE_DISK_COMPONENTS);
+ List<ILSMDiskComponent> toBeDeleted;
+ synchronized (opTracker) {
+ waitForFlushesAndMerges();
+ // Ensure that current memory component is empty and that no failed flushes happened so far
+ // This is a workaround until ASTERIXDB-2106 is fixed
+ ensureNoFailedFlush();
+ List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents();
+ for (ILSMDiskComponent component : diskComponents) {
+ if (predicate.test(component)) {
+ ctx.getComponentsToBeMerged().add(component);
+ }
+ }
+ if (ctx.getComponentsToBeMerged().isEmpty()) {
+ return;
+ }
+ toBeDeleted = new ArrayList<>(ctx.getComponentsToBeMerged());
+ // ScheduleMerge is actually a try operation
+ scheduleMerge(ctx, ioCallback);
+ }
+ IOOperationUtils.waitForIoOperation(ioCallback);
+ // ensure that merge has succeeded
+ for (ILSMDiskComponent component : toBeDeleted) {
+ if (lsmIndex.getDiskComponents().contains(component)) {
+ throw HyracksDataException.create(ErrorCode.A_MERGE_OPERATION_HAS_FAILED);
+ }
+ }
+ }
+
+ /**
+ * This can only be called in the steady state where:
+ * 1. no scheduled flushes
+ * 2. no incoming data
+ *
+ * @throws HyracksDataException
+ */
+ private void ensureNoFailedFlush() throws HyracksDataException {
+ for (ILSMMemoryComponent memoryComponent : lsmIndex.getMemoryComponents()) {
+ if (memoryComponent.getState() == ComponentState.READABLE_UNWRITABLE) {
+ throw HyracksDataException.create(ErrorCode.A_FLUSH_OPERATION_HAS_FAILED);
+ }
+ }
+ }
+
+ private void waitForFlushesAndMerges() throws HyracksDataException {
+ while (flushingOrMerging()) {
+ try {
+ opTracker.wait(); // NOSONAR: OpTracker is always synchronized here
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARNING, "Interrupted while attempting component level delete", e);
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+
+ private boolean flushingOrMerging() {
+ // check if flushes are taking place
+ for (ILSMMemoryComponent memComponent : lsmIndex.getMemoryComponents()) {
+ if (memComponent.getState() == ComponentState.READABLE_UNWRITABLE_FLUSHING) {
+ return true;
+ }
+ }
+ // check if merges are taking place
+ for (ILSMDiskComponent diskComponent : lsmIndex.getDiskComponents()) {
+ if (diskComponent.getState() == ComponentState.READABLE_MERGING) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
public String toString() {
return getClass().getSimpleName() + ":" + lsmIndex;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index a45225d..c0fd443 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -20,6 +20,7 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
import java.util.List;
+import java.util.function.Predicate;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
@@ -30,6 +31,7 @@
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
@@ -123,7 +125,6 @@
@Override
public void merge(ILSMIOOperation operation) throws HyracksDataException {
- ctx.setOperation(IndexOperation.MERGE);
lsmHarness.merge(ctx, operation);
}
@@ -224,4 +225,14 @@
public String toString() {
return getClass().getSimpleName() + ':' + lsmHarness.toString();
}
+
+ @Override
+ public void deleteComponents(Predicate<ILSMComponent> predicate) throws HyracksDataException {
+ lsmHarness.deleteComponents(ctx, predicate);
+ }
+
+ @Override
+ public ILSMIndexOperationContext getOpContext() {
+ return ctx;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
index 3540b84..c83d534 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
@@ -28,19 +28,16 @@
import org.apache.hyracks.storage.common.IIndexCursor;
public class MergeOperation extends AbstractIoOperation {
-
- protected final List<ILSMComponent> mergingComponents;
protected final IIndexCursor cursor;
public MergeOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback,
- String indexIdentifier, List<ILSMComponent> mergingComponents, IIndexCursor cursor) {
+ String indexIdentifier, IIndexCursor cursor) {
super(accessor, target, callback, indexIdentifier);
- this.mergingComponents = mergingComponents;
this.cursor = cursor;
}
public List<ILSMComponent> getMergingComponents() {
- return mergingComponents;
+ return accessor.getOpContext().getComponentHolder();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
index 6878910..7d7266e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -52,7 +52,7 @@
@Override
public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException {
- List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents());
if (!areComponentsReadableWritableState(immutableComponents)) {
return;
@@ -140,7 +140,7 @@
* there will be no new merge either in this situation.
*/
- List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents());
// reverse the list so that we look from the oldest to the newest components
Collections.reverse(immutableComponents);
int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents);
@@ -225,7 +225,7 @@
* @throws IndexException
*/
protected boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException {
- List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents());
// Reverse the components order so that we look at components from oldest to newest.
Collections.reverse(immutableComponents);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
index d801a44..08e5f94 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.util.trace.Tracer;
import org.apache.hyracks.util.trace.Tracer.Scope;
@@ -94,6 +95,11 @@
public FileReference getTarget() {
return ioOp.getTarget();
}
+
+ @Override
+ public ILSMIndexAccessor getAccessor() {
+ return ioOp.getAccessor();
+ }
}
class ComparableTracedIOOperation extends TracedIOOperation implements Comparable<ILSMIOOperation> {
@@ -122,4 +128,4 @@
+ other.getClass().getSimpleName() + " in " + getClass().getSimpleName());
return Integer.signum(hashCode() - other.hashCode());
}
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
similarity index 83%
rename from hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
rename to hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
index 40017d1..d9f5c8f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.storage.am.lsm.common.utils;
+package org.apache.hyracks.storage.am.lsm.common.util;
import java.util.List;
import java.util.logging.Level;
@@ -26,19 +26,22 @@
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
-public class ComponentMetadataUtil {
+public class ComponentUtils {
- private static final Logger LOGGER = Logger.getLogger(ComponentMetadataUtil.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(ComponentUtils.class.getName());
public static final MutableArrayValueReference MARKER_LSN_KEY = new MutableArrayValueReference("Marker".getBytes());
public static final long NOT_FOUND = -1L;
- private ComponentMetadataUtil() {
+ private ComponentUtils() {
}
/**
@@ -120,7 +123,7 @@
private static void fromDiskComponents(ILSMIndex index, IValueReference key, IPointable pointable)
throws HyracksDataException {
LOGGER.log(Level.INFO, "Getting " + key + " from disk components of " + index);
- for (ILSMDiskComponent c : index.getImmutableComponents()) {
+ for (ILSMDiskComponent c : index.getDiskComponents()) {
LOGGER.log(Level.INFO, "Getting " + key + " from disk components " + c);
c.getMetadata().get(key, pointable);
if (pointable.getLength() != 0) {
@@ -152,4 +155,24 @@
}
}
}
+
+ public static void markAsValid(ITreeIndex treeIndex, boolean forceToDisk) throws HyracksDataException {
+ int fileId = treeIndex.getFileId();
+ IBufferCache bufferCache = treeIndex.getBufferCache();
+ treeIndex.getPageManager().close();
+ // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
+ // won't be flushed to disk because it won't be dirty until the write latch has been released.
+ // Force modified metadata page to disk.
+ // If the index is not durable, then the flush is not necessary.
+ if (forceToDisk) {
+ bufferCache.force(fileId, true);
+ }
+ }
+
+ public static void markAsValid(IBufferCache bufferCache, BloomFilter filter, boolean forceToDisk)
+ throws HyracksDataException {
+ if (forceToDisk) {
+ bufferCache.force(filter.getFileId(), true);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/IOOperationUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/IOOperationUtils.java
new file mode 100644
index 0000000..0aeb0b9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/IOOperationUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.util;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
+
+public class IOOperationUtils {
+ private static final Logger LOGGER = Logger.getLogger(IOOperationUtils.class.getName());
+
+ private IOOperationUtils() {
+ }
+
+ public static void waitForIoOperation(BlockingIOOperationCallbackWrapper ioCallback) throws HyracksDataException {
+ // Note that the following call assumes that the io operation has succeeded.
+ try {
+ ioCallback.waitForIO();
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARNING, "Operation has been interrupted. returning");
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 7994bf0..4cd8543 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -219,7 +219,7 @@
if (ctx.getIndexTuple() != null) {
ctx.getIndexTuple().reset(tuple);
indexTuple = ctx.getIndexTuple();
- ((InMemoryInvertedIndexAccessor)(ctx.getCurrentMutableInvIndexAccessors())).resetLogTuple(tuple);
+ ((InMemoryInvertedIndexAccessor) (ctx.getCurrentMutableInvIndexAccessors())).resetLogTuple(tuple);
} else {
indexTuple = tuple;
}
@@ -330,7 +330,7 @@
}
@Override
- public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException {
LSMInvertedIndexFlushOperation flushOp = (LSMInvertedIndexFlushOperation) operation;
// Create an inverted index instance to be bulk loaded.
@@ -408,13 +408,12 @@
}
@Override
- public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException {
LSMInvertedIndexMergeOperation mergeOp = (LSMInvertedIndexMergeOperation) operation;
IIndexCursor cursor = mergeOp.getCursor();
RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null);
ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) cursor).getOpCtx();
- opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
// Scan diskInvertedIndexes ignoring the memoryInvertedIndex.
search(opCtx, cursor, mergePred);
@@ -620,26 +619,6 @@
}
@Override
- public void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException {
- LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) lsmComponent;
- OnDiskInvertedIndex invIndex = (OnDiskInvertedIndex) invIndexComponent.getInvIndex();
- IBufferCache bufferCache = invIndex.getBufferCache();
- markAsValidInternal(invIndex.getBufferCache(), invIndexComponent.getBloomFilter());
-
- // Flush inverted index second.
- bufferCache.force(invIndex.getInvListsFileId(), true);
- markAsValidInternal(invIndex.getBTree());
-
- // Flush deleted keys BTree.
- markAsValidInternal(invIndexComponent.getDeletedKeysBTree());
- }
-
- @Override
- public String toString() {
- return "LSMInvertedIndex [" + fileManager.getBaseDir() + "]";
- }
-
- @Override
public boolean isPrimaryIndex() {
return false;
}
@@ -709,22 +688,20 @@
@Override
protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx,
- ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs,
- ILSMIOOperationCallback callback) throws HyracksDataException {
+ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
return new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(getLsmHarness(), opCtx),
- flushingComponent, componentFileRefs.getInsertIndexFileReference(),
- componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(),
- callback, fileManager.getBaseDir().getAbsolutePath());
+ componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
+ componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath());
}
@Override
protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
- List<ILSMComponent> mergingComponents, LSMComponentFileReferences mergeFileRefs,
- ILSMIOOperationCallback callback) throws HyracksDataException {
+ LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException {
ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getLsmHarness(), opCtx);
IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(opCtx);
- return new LSMInvertedIndexMergeOperation(accessor, mergingComponents, cursor,
- mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(),
- mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath());
+ return new LSMInvertedIndexMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(),
+ mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback,
+ fileManager.getBaseDir().getAbsolutePath());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index dddd14a..61fc84e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -19,12 +19,14 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
import java.util.List;
+import java.util.function.Predicate;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
@@ -213,4 +215,14 @@
public String toString() {
return getClass().getSimpleName() + ':' + lsmHarness.toString();
}
+
+ @Override
+ public void deleteComponents(Predicate<ILSMComponent> predicate) throws HyracksDataException {
+ lsmHarness.deleteComponents(ctx, predicate);
+ }
+
+ @Override
+ public ILSMIndexOperationContext getOpContext() {
+ return ctx;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index 2470a39..b77f894 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -24,8 +24,10 @@
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex;
import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
public class LSMInvertedIndexDiskComponent extends AbstractLSMDiskComponent {
@@ -80,4 +82,17 @@
public String toString() {
return getClass().getSimpleName() + ":" + ((OnDiskInvertedIndex) invIndex).getInvListsFile().getRelativePath();
}
+
+ @Override
+ public void markAsValid(boolean persist) throws HyracksDataException {
+ IBufferCache bufferCache = invIndex.getBufferCache();
+ ComponentUtils.markAsValid(invIndex.getBufferCache(), bloomFilter, persist);
+
+ // Flush inverted index second.
+ bufferCache.force(((OnDiskInvertedIndex) invIndex).getInvListsFileId(), true);
+ ComponentUtils.markAsValid(((OnDiskInvertedIndex) invIndex).getBTree(), persist);
+
+ // Flush deleted keys BTree.
+ ComponentUtils.markAsValid(deletedKeysBTree, persist);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
index df4f095..2106f6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
@@ -22,17 +22,16 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
public class LSMInvertedIndexFlushOperation extends FlushOperation {
private final FileReference deletedKeysBTreeFlushTarget;
private final FileReference bloomFilterFlushTarget;
- public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor, ILSMMemoryComponent flushingComponent,
- FileReference flushTarget, FileReference deletedKeysBTreeFlushTarget, FileReference bloomFilterFlushTarget,
+ public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget,
+ FileReference deletedKeysBTreeFlushTarget, FileReference bloomFilterFlushTarget,
ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, flushingComponent, flushTarget, callback, indexIdentifier);
+ super(accessor, flushTarget, callback, indexIdentifier);
this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget;
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index da374dc..2c1db0f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -19,10 +19,7 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
-import java.util.List;
-
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
@@ -32,10 +29,10 @@
private final FileReference deletedKeysBTreeMergeTarget;
private final FileReference bloomFilterMergeTarget;
- public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, List<ILSMComponent> mergingComponents,
- IIndexCursor cursor, FileReference target, FileReference deletedKeysBTreeMergeTarget,
- FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, target, callback, indexIdentifier, mergingComponents, cursor);
+ public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, FileReference target,
+ FileReference deletedKeysBTreeMergeTarget, FileReference bloomFilterMergeTarget,
+ ILSMIOOperationCallback callback, String indexIdentifier) {
+ super(accessor, target, callback, indexIdentifier, cursor);
this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index eb03696..4f08dd3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -294,11 +294,6 @@
}
@Override
- public String toString() {
- return "LSMRTree [" + fileManager.getBaseDir() + "]";
- }
-
- @Override
protected void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException {
LSMRTreeMemoryComponent mutableComponent = (LSMRTreeMemoryComponent) c;
((IVirtualBufferCache) mutableComponent.getRTree().getBufferCache()).open();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index e39c3f9..6595403 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -136,7 +136,7 @@
// is needed.
// It only needs to return the newer list
@Override
- public List<ILSMDiskComponent> getImmutableComponents() {
+ public List<ILSMDiskComponent> getDiskComponents() {
if (version == 0) {
return diskComponents;
} else {
@@ -258,12 +258,11 @@
// This can be done in a better way by creating a method boolean
// keepDeletedTuples(mergedComponents);
@Override
- public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException {
LSMRTreeMergeOperation mergeOp = (LSMRTreeMergeOperation) operation;
IIndexCursor cursor = mergeOp.getCursor();
ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
ILSMIndexOperationContext opCtx = ((LSMRTreeSortedCursor) cursor).getOpCtx();
- opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
search(opCtx, cursor, rtreeSearchPred);
LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(),
@@ -424,7 +423,7 @@
// Not supported
@Override
- public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException {
throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-RTree");
}
@@ -570,7 +569,7 @@
} else if (isTransaction) {
// Since this is a transaction component, validate and
// deactivate. it could later be added or deleted
- markAsValid(component);
+ component.markAsValid(durable);
RTree rtree = ((LSMRTreeDiskComponent) component).getRTree();
BTree btree = ((LSMRTreeDiskComponent) component).getBTree();
BloomFilter bloomFilter = ((LSMRTreeDiskComponent) component).getBloomFilter();
@@ -621,11 +620,6 @@
}
}
- @Override
- public String toString() {
- return "LSMTwoPCRTree [" + fileManager.getBaseDir() + "]";
- }
-
// The only change the the schedule merge is the method used to create the
// opCtx. first line <- in schedule merge, we->
@Override
@@ -640,7 +634,7 @@
(ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1));
ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), rctx, buddyBTreeFields);
// create the merge operation.
- LSMRTreeMergeOperation mergeOp = new LSMRTreeMergeOperation(accessor, mergingComponents, cursor,
+ LSMRTreeMergeOperation mergeOp = new LSMRTreeMergeOperation(accessor, cursor,
relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath());
ioScheduler.scheduleOperation(mergeOp);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 0a47aea..ca0e4e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -56,7 +56,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -163,7 +162,7 @@
}
@Override
- public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException {
LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation;
LSMRTreeMemoryComponent flushingComponent = (LSMRTreeMemoryComponent) flushOp.getFlushingComponent();
// Renaming order is critical because we use assume ordering when we
@@ -263,19 +262,15 @@
}
@Override
- public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException {
LSMRTreeMergeOperation mergeOp = (LSMRTreeMergeOperation) operation;
IIndexCursor cursor = mergeOp.getCursor();
ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
ILSMIndexOperationContext opCtx = ((LSMRTreeSortedCursor) cursor).getOpCtx();
- opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
search(opCtx, cursor, rtreeSearchPred);
-
LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(),
mergeOp.getBTreeTarget(), mergeOp.getBloomFilterTarget(), true);
-
ILSMDiskComponentBulkLoader componentBulkLoader;
-
// In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that
// lsmHarness.endSearch() is called once when the r-trees have been merged.
if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
@@ -409,14 +404,6 @@
}
@Override
- public void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException {
- LSMRTreeDiskComponent component = (LSMRTreeDiskComponent) lsmComponent;
- markAsValidInternal(component.getBTree().getBufferCache(), component.getBloomFilter());
- markAsValidInternal((component).getBTree());
- markAsValidInternal((component).getRTree());
- }
-
- @Override
public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) {
Set<String> files = new HashSet<>();
LSMRTreeDiskComponent component = (LSMRTreeDiskComponent) lsmComponent;
@@ -428,22 +415,21 @@
@Override
protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx,
- ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs,
- ILSMIOOperationCallback callback) throws HyracksDataException {
+ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
LSMRTreeAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields);
- return new LSMRTreeFlushOperation(accessor, flushingComponent, componentFileRefs.getInsertIndexFileReference(),
+ return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(),
componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(),
callback, fileManager.getBaseDir().getAbsolutePath());
}
@Override
protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
- List<ILSMComponent> mergingComponents, LSMComponentFileReferences mergeFileRefs,
- ILSMIOOperationCallback callback) throws HyracksDataException {
+ LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException {
ITreeIndexCursor cursor = new LSMRTreeSortedCursor(opCtx, linearizer, buddyBTreeFields);
ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields);
- return new LSMRTreeMergeOperation(accessor, mergingComponents, cursor,
- mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(),
- mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath());
+ return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(),
+ mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback,
+ fileManager.getBaseDir().getAbsolutePath());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
index 54ef122..0f7943d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.am.rtree.impls.RTree;
public class LSMRTreeDiskComponent extends AbstractLSMDiskComponent {
@@ -81,4 +82,15 @@
public String toString() {
return getClass().getSimpleName() + ":" + rtree.getFileReference().getRelativePath();
}
+
+ @Override
+ public void markAsValid(boolean persist) throws HyracksDataException {
+ if (bloomFilter != null) {
+ ComponentUtils.markAsValid(btree.getBufferCache(), bloomFilter, persist);
+ }
+ if (btree != null) {
+ ComponentUtils.markAsValid(btree, persist);
+ }
+ ComponentUtils.markAsValid(rtree, persist);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index f3e45ac..6991c56 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -21,7 +21,6 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
public class LSMRTreeFlushOperation extends FlushOperation {
@@ -29,10 +28,9 @@
private final FileReference btreeFlushTarget;
private final FileReference bloomFilterFlushTarget;
- public LSMRTreeFlushOperation(ILSMIndexAccessor accessor, ILSMMemoryComponent flushingComponent,
- FileReference flushTarget, FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget,
- ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, flushingComponent, flushTarget, callback, indexIdentifier);
+ public LSMRTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, FileReference btreeFlushTarget,
+ FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
+ super(accessor, flushTarget, callback, indexIdentifier);
this.btreeFlushTarget = btreeFlushTarget;
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index 9b3aa0c..83872cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -18,11 +18,8 @@
*/
package org.apache.hyracks.storage.am.lsm.rtree.impls;
-import java.util.List;
-
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
@@ -31,10 +28,10 @@
private final FileReference btreeMergeTarget;
private final FileReference bloomFilterMergeTarget;
- public LSMRTreeMergeOperation(ILSMIndexAccessor accessor, List<ILSMComponent> mergingComponents,
- ITreeIndexCursor cursor, FileReference target, FileReference btreeMergeTarget,
- FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
- super(accessor, target, callback, indexIdentifier, mergingComponents, cursor);
+ public LSMRTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target,
+ FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback,
+ String indexIdentifier) {
+ super(accessor, target, callback, indexIdentifier, cursor);
this.btreeMergeTarget = btreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 94648fb..1e15455 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -47,7 +47,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -124,7 +123,7 @@
}
@Override
- public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException {
LSMRTreeFlushOperation flushOp = (LSMRTreeFlushOperation) operation;
// Renaming order is critical because we use assume ordering when we
// read the file names when we open the tree.
@@ -212,12 +211,11 @@
}
@Override
- public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
+ public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException {
MergeOperation mergeOp = (MergeOperation) operation;
IIndexCursor cursor = mergeOp.getCursor();
ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) cursor).getOpCtx();
- opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
search(opCtx, cursor, rtreeSearchPred);
// Bulk load the tuples from all on-disk RTrees into the new RTree.
@@ -321,12 +319,6 @@
}
@Override
- public void markAsValid(ILSMDiskComponent lsmComponent) throws HyracksDataException {
- RTree rtree = ((LSMRTreeDiskComponent) lsmComponent).getRTree();
- markAsValidInternal(rtree);
- }
-
- @Override
public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) {
Set<String> files = new HashSet<>();
RTree rtree = ((LSMRTreeDiskComponent) lsmComponent).getRTree();
@@ -336,24 +328,24 @@
@Override
protected ILSMIOOperation createFlushOperation(AbstractLSMIndexOperationContext opCtx,
- ILSMMemoryComponent flushingComponent, LSMComponentFileReferences componentFileRefs,
- ILSMIOOperationCallback callback) throws HyracksDataException {
+ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory);
- return new LSMRTreeFlushOperation(accessor, flushingComponent, componentFileRefs.getInsertIndexFileReference(),
- null, null, callback, fileManager.getBaseDir().getAbsolutePath());
+ return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), null, null,
+ callback, fileManager.getBaseDir().getAbsolutePath());
}
@Override
protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
- List<ILSMComponent> mergingComponents, LSMComponentFileReferences mergeFileRefs,
- ILSMIOOperationCallback callback) throws HyracksDataException {
+ LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException {
boolean returnDeletedTuples = false;
+ List<ILSMComponent> mergingComponents = opCtx.getComponentHolder();
if (mergingComponents.get(mergingComponents.size() - 1) != diskComponents.get(diskComponents.size() - 1)) {
returnDeletedTuples = true;
}
ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(opCtx, returnDeletedTuples);
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory);
return new MergeOperation(accessor, mergeFileRefs.getInsertIndexFileReference(), callback,
- fileManager.getBaseDir().getAbsolutePath(), mergingComponents, cursor);
+ fileManager.getBaseDir().getAbsolutePath(), cursor);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/pom.xml
index 55c5710..597ce59 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/pom.xml
@@ -36,6 +36,27 @@
<skip>true</skip>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/*.class</include>
+ <include>**/*.properties</include>
+ <include>**/README*</include>
+ <include>**/NOTICE*</include>
+ <include>**/LICENSE*</include>
+ <include>**/DEPENDENCIES*</include>
+ </includes>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java
index fd76a10..5d6d8de 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java
@@ -76,12 +76,12 @@
// Make sure the disk component was generated
LSMBTree btree = (LSMBTree) ctx.getIndex();
- Assert.assertEquals("Check disk components", 1, btree.getImmutableComponents().size());
+ Assert.assertEquals("Check disk components", 1, btree.getDiskComponents().size());
ctx.getIndex().deactivate();
// Delete the btree file and keep the bloom filter file from the disk component
- LSMBTreeDiskComponent ilsmDiskComponent = (LSMBTreeDiskComponent) btree.getImmutableComponents().get(0);
+ LSMBTreeDiskComponent ilsmDiskComponent = (LSMBTreeDiskComponent) btree.getDiskComponents().get(0);
ilsmDiskComponent.getBTree().getFileReference().delete();
File bloomFilterFile = ilsmDiskComponent.getBloomFilter().getFileReference().getFile().getAbsoluteFile();
@@ -90,6 +90,6 @@
// Activating the index again should delete the orphaned bloom filter file as well as the disk component
ctx.getIndex().activate();
Assert.assertEquals("Check bloom filter file deleted", false, bloomFilterFile.exists());
- Assert.assertEquals("Check disk components", 0, btree.getImmutableComponents().size());
+ Assert.assertEquals("Check disk components", 0, btree.getDiskComponents().size());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java
index 6ffdc7d..c5eb97c 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java
@@ -129,7 +129,7 @@
}
}
- List<ILSMDiskComponent> flushedComponents = ((LSMBTree) ctx.getIndex()).getImmutableComponents();
+ List<ILSMDiskComponent> flushedComponents = ((LSMBTree) ctx.getIndex()).getDiskComponents();
MutablePair<ITupleReference, ITupleReference> expectedMergeMinMax = null;
for (ILSMDiskComponent f : flushedComponents) {
Pair<ITupleReference, ITupleReference> componentMinMax = filterToMinMax(f.getLSMComponentFilter());
@@ -146,9 +146,9 @@
}
}
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((LSMBTree) ctx.getIndex()).getImmutableComponents());
+ ((LSMBTree) ctx.getIndex()).getDiskComponents());
- flushedComponents = ((LSMBTree) ctx.getIndex()).getImmutableComponents();
+ flushedComponents = ((LSMBTree) ctx.getIndex()).getDiskComponents();
Pair<ITupleReference, ITupleReference> mergedMinMax =
filterToMinMax(flushedComponents.get(0).getLSMComponentFilter());
Assert.assertEquals(0, TreeIndexTestUtils.compareFilterTuples(expectedMergeMinMax.getLeft(),
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index 1df0d39..7dac1e5 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -76,7 +76,7 @@
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((LSMBTree) ctx.getIndex()).getImmutableComponents());
+ ((LSMBTree) ctx.getIndex()).getDiskComponents());
orderedIndexTestUtils.checkPointSearches(ctx);
orderedIndexTestUtils.checkScan(ctx);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java
index a8eff99..acbeaef 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java
@@ -129,7 +129,7 @@
accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback());
LSMBTree btree = (LSMBTree) ctx.getIndex();
- Assert.assertEquals("Check disk components", 3, btree.getImmutableComponents().size());
+ Assert.assertEquals("Check disk components", 3, btree.getDiskComponents().size());
IIndexCursor cursor = accessor.createSearchCursor(false);
accessor.scanDiskComponents(cursor);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java
index 71d6310..fbcbcc2 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java
@@ -355,9 +355,9 @@
accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback());
LSMBTree btree = (LSMBTree) ctx.getIndex();
- Assert.assertEquals("Check disk components", 1, btree.getImmutableComponents().size());
+ Assert.assertEquals("Check disk components", 1, btree.getDiskComponents().size());
- LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) btree.getImmutableComponents().get(0);
+ LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) btree.getDiskComponents().get(0);
BTree.BTreeAccessor btreeAccessor = (BTree.BTreeAccessor) btreeComponent.getBTree()
.createAccessor(TestOperationCallback.INSTANCE, TestOperationCallback.INSTANCE);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentState.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
similarity index 82%
rename from hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentState.java
rename to hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
index 094b6c6..2989af9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentState.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
@@ -16,12 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.hyracks.storage.am.lsm.btree.impl;
-package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.concurrent.Semaphore;
-public enum LSMComponentState {
- FLUSHING,
- MERGING,
- DONE_FLUSHING,
- DONE_MERGING
+public interface ITestOpCallback {
+ void callback(Semaphore smeaphore);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
new file mode 100644
index 0000000..79f4e22
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
+import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import org.apache.hyracks.util.trace.Tracer;
+
+public class TestLsmBtree extends LSMBTree {
+
+ // Semaphores are used to control operations
+ private final Semaphore modifySemaphore = new Semaphore(0);
+ private final Semaphore searchSemaphore = new Semaphore(0);
+ private final Semaphore flushSemaphore = new Semaphore(0);
+ private final Semaphore mergeSemaphore = new Semaphore(0);
+ private final List<ITestOpCallback> modifyCallbacks = new ArrayList<>();
+ private final List<ITestOpCallback> searchCallbacks = new ArrayList<>();
+ private final List<ITestOpCallback> flushCallbacks = new ArrayList<>();
+ private final List<ITestOpCallback> mergeCallbacks = new ArrayList<>();
+
+ private volatile int numScheduledFlushes;
+ private volatile int numStartedFlushes;
+ private volatile int numFinishedFlushes;
+ private volatile int numScheduledMerges;
+ private volatile int numFinishedMerges;
+ private volatile int numStartedMerges;
+
+ public TestLsmBtree(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
+ ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
+ ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMIndexFileManager fileManager,
+ TreeIndexFactory<BTree> diskBTreeFactory, TreeIndexFactory<BTree> bulkLoadBTreeFactory,
+ BloomFilterFactory bloomFilterFactory, IComponentFilterHelper filterHelper,
+ ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager,
+ double bloomFilterFalsePositiveRate, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
+ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
+ ILSMIOOperationCallback ioOpCallback, boolean needKeyDupCheck, int[] btreeFields, int[] filterFields,
+ boolean durable, boolean updateAware, Tracer tracer) throws HyracksDataException {
+ super(ioManager, virtualBufferCaches, interiorFrameFactory, insertLeafFrameFactory, deleteLeafFrameFactory,
+ fileManager, diskBTreeFactory, bulkLoadBTreeFactory, bloomFilterFactory, filterHelper,
+ filterFrameFactory, filterManager, bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy,
+ opTracker, ioScheduler, ioOpCallback, needKeyDupCheck, btreeFields, filterFields, durable, updateAware,
+ tracer);
+ }
+
+ @Override
+ public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException {
+ synchronized (modifyCallbacks) {
+ for (ITestOpCallback callback : modifyCallbacks) {
+ callback(callback, modifySemaphore);
+ }
+ }
+ acquire(modifySemaphore);
+ super.modify(ictx, tuple);
+ }
+
+ public static void callback(ITestOpCallback callback, Semaphore semaphore) {
+ if (callback != null) {
+ callback.callback(semaphore);
+ }
+ }
+
+ @Override
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
+ super.scheduleFlush(ctx, callback);
+ numScheduledFlushes++;
+ }
+
+ @Override
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
+ super.scheduleMerge(ctx, callback);
+ numScheduledMerges++;
+ }
+
+ @Override
+ public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException {
+ numStartedFlushes++;
+ synchronized (flushCallbacks) {
+ for (ITestOpCallback callback : flushCallbacks) {
+ callback(callback, flushSemaphore);
+ }
+ }
+ acquire(flushSemaphore);
+ ILSMDiskComponent c = super.doFlush(operation);
+ numFinishedFlushes++;
+ return c;
+ }
+
+ @Override
+ public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException {
+ numStartedMerges++;
+ synchronized (mergeCallbacks) {
+ for (ITestOpCallback callback : mergeCallbacks) {
+ callback(callback, mergeSemaphore);
+ }
+ }
+ acquire(mergeSemaphore);
+ ILSMDiskComponent c = super.doMerge(operation);
+ numFinishedMerges++;
+ return c;
+ }
+
+ private void acquire(Semaphore semaphore) throws HyracksDataException {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public void allowModify(int permits) {
+ modifySemaphore.release(permits);
+ }
+
+ public void allowSearch(int permits) {
+ searchSemaphore.release(permits);
+ }
+
+ public void allowFlush(int permits) {
+ flushSemaphore.release(permits);
+ }
+
+ public void allowMerge(int permits) {
+ mergeSemaphore.release(permits);
+ }
+
+ @Override
+ public ILSMIndexAccessor createAccessor(AbstractLSMIndexOperationContext opCtx) {
+ return new LSMTreeIndexAccessor(getLsmHarness(), opCtx, ctx -> new TestLsmBtreeSearchCursor(ctx, this));
+ }
+
+ public int getNumScheduledFlushes() {
+ return numScheduledFlushes;
+ }
+
+ public int getNumStartedFlushes() {
+ return numStartedFlushes;
+ }
+
+ public int getNumScheduledMerges() {
+ return numScheduledMerges;
+ }
+
+ public int getNumStartedMerges() {
+ return numStartedMerges;
+ }
+
+ public int getNumFinishedFlushes() {
+ return numFinishedFlushes;
+ }
+
+ public int getNumFinishedMerges() {
+ return numFinishedMerges;
+ }
+
+ public List<ITestOpCallback> getModifyCallbacks() {
+ return modifyCallbacks;
+ }
+
+ public void addModifyCallback(ITestOpCallback modifyCallback) {
+ synchronized (mergeCallbacks) {
+ modifyCallbacks.add(modifyCallback);
+ }
+ }
+
+ public void clearModifyCallbacks() {
+ synchronized (mergeCallbacks) {
+ modifyCallbacks.clear();
+ }
+ }
+
+ public List<ITestOpCallback> getSearchCallbacks() {
+ return searchCallbacks;
+ }
+
+ public void clearSearchCallbacks() {
+ synchronized (searchCallbacks) {
+ searchCallbacks.clear();
+ }
+ }
+
+ public void addSearchCallback(ITestOpCallback searchCallback) {
+ synchronized (searchCallbacks) {
+ searchCallbacks.add(searchCallback);
+ }
+ }
+
+ public void addFlushCallback(ITestOpCallback flushCallback) {
+ synchronized (flushCallbacks) {
+ flushCallbacks.add(flushCallback);
+ }
+ }
+
+ public void clearFlushCallbacks() {
+ synchronized (flushCallbacks) {
+ flushCallbacks.clear();
+ }
+ }
+
+ public void addMergeCallback(ITestOpCallback mergeCallback) {
+ synchronized (mergeCallbacks) {
+ mergeCallbacks.add(mergeCallback);
+ }
+ }
+
+ public void clearMergeCallbacks() {
+ synchronized (mergeCallbacks) {
+ mergeCallbacks.clear();
+ }
+ }
+
+ public Semaphore getSearchSemaphore() {
+ return searchSemaphore;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
new file mode 100644
index 0000000..702a69d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impl;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResource;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import org.apache.hyracks.storage.common.IStorageManager;
+
+public class TestLsmBtreeLocalResource extends LSMBTreeLocalResource {
+ private static final long serialVersionUID = 1L;
+
+ public TestLsmBtreeLocalResource(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
+ int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, boolean isPrimary, String path,
+ IStorageManager storageManager, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits,
+ IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields,
+ ILSMOperationTrackerFactory opTrackerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+ IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable) {
+ super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, path,
+ storageManager, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory,
+ vbcProvider, ioSchedulerProvider, durable);
+ }
+
+ @Override
+ public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
+ IIOManager ioManager = serviceCtx.getIoManager();
+ FileReference file = ioManager.resolve(path);
+ List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+ return TestLsmBtreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx),
+ typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
+ opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx),
+ ioOpCallbackFactory.createIoOpCallback(), isPrimary, filterTypeTraits, filterCmpFactories, btreeFields,
+ filterFields, durable, metadataPageManagerFactory, false, serviceCtx.getTracer());
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
new file mode 100644
index 0000000..6b13f56
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impl;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResourceFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
+import org.apache.hyracks.storage.common.IStorageManager;
+
+public class TestLsmBtreeLocalResourceFactory extends LSMBTreeLocalResourceFactory {
+ private static final long serialVersionUID = 1L;
+
+ public TestLsmBtreeLocalResourceFactory(IStorageManager storageManager, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] cmpFactories, ITypeTraits[] filterTypeTraits,
+ IBinaryComparatorFactory[] filterCmpFactories, int[] filterFields,
+ ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+ IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, boolean durable, int[] bloomFilterKeyFields,
+ double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) {
+ super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
+ opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
+ mergePolicyFactory, mergePolicyProperties, durable, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
+ isPrimary, btreeFields);
+ }
+
+ @Override
+ public LsmResource createResource(FileReference fileRef) {
+ return new TestLsmBtreeLocalResource(typeTraits, cmpFactories, bloomFilterKeyFields,
+ bloomFilterFalsePositiveRate, isPrimary, fileRef.getRelativePath(), storageManager, mergePolicyFactory,
+ mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields,
+ opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
+ durable);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java
new file mode 100644
index 0000000..d3504ae
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impl;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+
+public class TestLsmBtreeSearchCursor extends LSMBTreeSearchCursor {
+
+ private final TestLsmBtree lsmBtree;
+
+ public TestLsmBtreeSearchCursor(ILSMIndexOperationContext opCtx, TestLsmBtree lsmBtree) {
+ super(opCtx);
+ this.lsmBtree = lsmBtree;
+ }
+
+ @Override
+ public void next() throws HyracksDataException {
+ try {
+ List<ITestOpCallback> callbacks = lsmBtree.getSearchCallbacks();
+ synchronized (callbacks) {
+ for (ITestOpCallback cb : callbacks) {
+ TestLsmBtree.callback(cb, lsmBtree.getSearchSemaphore());
+ }
+ }
+ lsmBtree.getSearchSemaphore().acquire();
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ super.next();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeUtil.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeUtil.java
new file mode 100644
index 0000000..940e517
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeUtil.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impl;
+
+import java.util.List;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
+import org.apache.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import org.apache.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import org.apache.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
+import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeCopyTupleWriterFactory;
+import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleWriterFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.frames.LSMComponentFilterFrameFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.BTreeFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.ComponentFilterHelper;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.util.trace.Tracer;
+
+public class TestLsmBtreeUtil {
+
+ private TestLsmBtreeUtil() {
+ }
+
+ public static LSMBTree createLSMTree(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
+ FileReference file, IBufferCache diskBufferCache, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] cmpFactories, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
+ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
+ ILSMIOOperationCallback ioOpCallback, boolean needKeyDupCheck, ITypeTraits[] filterTypeTraits,
+ IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields, boolean durable,
+ IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, Tracer tracer)
+ throws HyracksDataException {
+ LSMBTreeTupleWriterFactory insertTupleWriterFactory =
+ new LSMBTreeTupleWriterFactory(typeTraits, cmpFactories.length, false, updateAware);
+ LSMBTreeTupleWriterFactory deleteTupleWriterFactory =
+ new LSMBTreeTupleWriterFactory(typeTraits, cmpFactories.length, true, updateAware);
+ LSMBTreeCopyTupleWriterFactory copyTupleWriterFactory =
+ new LSMBTreeCopyTupleWriterFactory(typeTraits, cmpFactories.length, updateAware);
+ ITreeIndexFrameFactory insertLeafFrameFactory = new BTreeNSMLeafFrameFactory(insertTupleWriterFactory);
+ ITreeIndexFrameFactory copyTupleLeafFrameFactory = new BTreeNSMLeafFrameFactory(copyTupleWriterFactory);
+ ITreeIndexFrameFactory deleteLeafFrameFactory = new BTreeNSMLeafFrameFactory(deleteTupleWriterFactory);
+ ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(insertTupleWriterFactory);
+
+ TreeIndexFactory<BTree> diskBTreeFactory = new BTreeFactory(ioManager, diskBufferCache, freePageManagerFactory,
+ interiorFrameFactory, copyTupleLeafFrameFactory, cmpFactories, typeTraits.length);
+ TreeIndexFactory<BTree> bulkLoadBTreeFactory = new BTreeFactory(ioManager, diskBufferCache,
+ freePageManagerFactory, interiorFrameFactory, insertLeafFrameFactory, cmpFactories, typeTraits.length);
+
+ BloomFilterFactory bloomFilterFactory =
+ needKeyDupCheck ? new BloomFilterFactory(diskBufferCache, bloomFilterKeyFields) : null;
+
+ ComponentFilterHelper filterHelper = null;
+ LSMComponentFilterFrameFactory filterFrameFactory = null;
+ LSMComponentFilterManager filterManager = null;
+ if (filterCmpFactories != null) {
+ TypeAwareTupleWriterFactory filterTupleWriterFactory = new TypeAwareTupleWriterFactory(filterTypeTraits);
+ filterHelper = new ComponentFilterHelper(filterTupleWriterFactory, filterCmpFactories);
+ filterFrameFactory = new LSMComponentFilterFrameFactory(filterTupleWriterFactory);
+ filterManager = new LSMComponentFilterManager(filterFrameFactory);
+ }
+
+ //Primary LSMBTree index has a BloomFilter.
+ ILSMIndexFileManager fileNameManager =
+ new LSMBTreeFileManager(ioManager, file, diskBTreeFactory, needKeyDupCheck);
+
+ return new TestLsmBtree(ioManager, virtualBufferCaches, interiorFrameFactory, insertLeafFrameFactory,
+ deleteLeafFrameFactory, fileNameManager, diskBTreeFactory, bulkLoadBTreeFactory, bloomFilterFactory,
+ filterHelper, filterFrameFactory, filterManager, bloomFilterFalsePositiveRate, typeTraits.length,
+ cmpFactories, mergePolicy, opTracker, ioScheduler, ioOpCallback, needKeyDupCheck, btreeFields,
+ filterFields, durable, updateAware, tracer);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 024b72d..1667e47 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -119,7 +119,7 @@
case MERGE:
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- lsmBTree.getImmutableComponents());
+ lsmBTree.getDiskComponents());
break;
default:
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java
index 04dc736..e521b4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java
@@ -191,7 +191,7 @@
}
ILSMIndex index = Mockito.mock(ILSMIndex.class);
- Mockito.when(index.getImmutableComponents()).thenReturn(components);
+ Mockito.when(index.getDiskComponents()).thenReturn(components);
ILSMIndexAccessor accessor = Mockito.mock(ILSMIndexAccessor.class);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
index c6520d5..d093aac 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
@@ -58,7 +58,7 @@
}
// Perform merge.
invIndexAccessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((LSMInvertedIndex) invIndex).getImmutableComponents());
+ ((LSMInvertedIndex) invIndex).getDiskComponents());
validateAndCheckIndex(testCtx);
runTinySearchWorkload(testCtx, tupleGen);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
index cf66fb8..3dc7262 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
@@ -60,7 +60,7 @@
}
// Perform merge.
invIndexAccessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((LSMInvertedIndex) invIndex).getImmutableComponents());
+ ((LSMInvertedIndex) invIndex).getDiskComponents());
validateAndCheckIndex(testCtx);
runTinySearchWorkload(testCtx, tupleGen);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
index d5267b5..2345698 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
@@ -116,7 +116,7 @@
case MERGE: {
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- invIndex.getImmutableComponents());
+ invIndex.getDiskComponents());
break;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
index ae4a493..9209a3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
@@ -78,7 +78,7 @@
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((AbstractLSMRTree) ctx.getIndex()).getImmutableComponents());
+ ((AbstractLSMRTree) ctx.getIndex()).getDiskComponents());
rTreeTestUtils.checkScan(ctx);
rTreeTestUtils.checkDiskOrderScan(ctx);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
index 95b4f44..fe4870b 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
@@ -79,7 +79,7 @@
case MERGE:
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- lsmRTree.getImmutableComponents());
+ lsmRTree.getDiskComponents());
break;
default:
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
index 3917203..2855f2e 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
@@ -68,7 +68,7 @@
case MERGE:
accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(),
- ((AbstractLSMRTree) lsmRTree).getImmutableComponents());
+ ((AbstractLSMRTree) lsmRTree).getDiskComponents());
break;
default: