[ASTERIXDB-3318][RUN] Create index builder factory per partition
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
LSMIndexIOOperationCallbackFactory factory is currently shared among
index builder factories. When index builders run concurrently
and without synchronization, this causes an issue because
LSMIndexIOOperationCallbackFactory is stateful.
Change-Id: I4c3b0b982b206ac3eea3653c68e3fecc31145cda
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17976
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 93802c2..43f40eb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -339,7 +339,8 @@
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
- FileSplit[] fs = partitioningProperties.getSplitsProvider().getFileSplits();
+ IFileSplitProvider splitsProvider = partitioningProperties.getSplitsProvider();
+ FileSplit[] fs = splitsProvider.getFileSplits();
StringBuilder sb = new StringBuilder();
for (FileSplit f : fs) {
sb.append(f).append(" ");
@@ -349,19 +350,37 @@
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
// prepare a LocalResourceMetadata which will be stored in NC's local resource
// repository
- IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType,
- compactionInfo.first, compactionInfo.second);
- IndexBuilderFactory indexBuilderFactory =
- new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
- partitioningProperties.getSplitsProvider(), resourceFactory, true);
- IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory,
- partitioningProperties.getComputeStorageMap());
+ int[][] computeStorageMap = partitioningProperties.getComputeStorageMap();
+ IndexBuilderFactory[][] indexBuilderFactories = getIndexBuilderFactories(dataset, metadataProvider, index,
+ itemType, metaItemType, splitsProvider, compactionInfo.first, compactionInfo.second, computeStorageMap);
+ IndexCreateOperatorDescriptor indexCreateOp =
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, computeStorageMap);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
partitioningProperties.getConstraints());
spec.addRoot(indexCreateOp);
return spec;
}
+ public static IndexBuilderFactory[][] getIndexBuilderFactories(Dataset dataset, MetadataProvider metadataProvider,
+ Index index, ARecordType itemType, ARecordType metaItemType, IFileSplitProvider fileSplitProvider,
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ int[][] computeStorageMap) throws AlgebricksException {
+ IndexBuilderFactory[][] indexBuilderFactories = new IndexBuilderFactory[computeStorageMap.length][];
+ for (int i = 0; i < computeStorageMap.length; i++) {
+ int len = computeStorageMap[i].length;
+ indexBuilderFactories[i] = new IndexBuilderFactory[len];
+ for (int k = 0; k < len; k++) {
+ IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType,
+ metaItemType, mergePolicyFactory, mergePolicyProperties);
+ IndexBuilderFactory indexBuilderFactory =
+ new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+ fileSplitProvider, resourceFactory, true);
+ indexBuilderFactories[i][k] = indexBuilderFactory;
+ }
+ }
+ return indexBuilderFactories;
+ }
+
public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName,
MetadataProvider metadataProvider) throws AlgebricksException {
DataverseName dataverseName = dataverse.getDataverseName();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 3dd84ec..e874663 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.OptimizationConfUtil;
-import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -81,14 +80,12 @@
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
@@ -158,13 +155,11 @@
@Override
public JobSpecification buildCreationJobSpec() throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
- IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, sampleIdx, itemType, metaType,
- mergePolicyFactory, mergePolicyProperties);
- IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
- fileSplitProvider, resourceFactory, true);
+ IndexBuilderFactory[][] indexBuilderFactories =
+ DatasetUtil.getIndexBuilderFactories(dataset, metadataProvider, sampleIdx, itemType, metaType,
+ fileSplitProvider, mergePolicyFactory, mergePolicyProperties, computeStorageMap);
IndexCreateOperatorDescriptor indexCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, computeStorageMap);
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, computeStorageMap);
indexCreateOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, partitionConstraint);
spec.addRoot(indexCreateOp);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 0cda625..62352bf 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -24,7 +24,6 @@
import java.util.Set;
import org.apache.asterix.common.cluster.PartitioningProperties;
-import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
@@ -34,14 +33,12 @@
import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
-import org.apache.hyracks.storage.common.IResourceFactory;
public abstract class SecondaryTreeIndexOperationsHelper extends SecondaryIndexOperationsHelper {
@@ -53,14 +50,13 @@
@Override
public JobSpecification buildCreationJobSpec() throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
- IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaType,
- mergePolicyFactory, mergePolicyProperties);
- IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
- secondaryFileSplitProvider, resourceFactory, true);
PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
- IndexCreateOperatorDescriptor secondaryIndexCreateOp = new IndexCreateOperatorDescriptor(spec,
- indexBuilderFactory, partitioningProperties.getComputeStorageMap());
+ int[][] computeStorageMap = partitioningProperties.getComputeStorageMap();
+ IndexBuilderFactory[][] indexBuilderFactories =
+ DatasetUtil.getIndexBuilderFactories(dataset, metadataProvider, index, itemType, metaType,
+ secondaryFileSplitProvider, mergePolicyFactory, mergePolicyProperties, computeStorageMap);
+ IndexCreateOperatorDescriptor secondaryIndexCreateOp =
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, computeStorageMap);
secondaryIndexCreateOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
secondaryPartitionConstraint);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index d90a212..4948a66 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -120,8 +120,10 @@
IResourceFactory primaryResourceFactory = createPrimaryResourceFactory();
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, primarySplitProvider, primaryResourceFactory, false);
+ IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1];
+ indexBuilderFactories[0][0] = indexBuilderFactory;
IndexCreateOperatorDescriptor primaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
spec.addRoot(primaryCreateOp);
runTest(spec);
@@ -168,8 +170,10 @@
IResourceFactory secondaryResourceFactory = createSecondaryResourceFactory();
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, secondarySplitProvider, secondaryResourceFactory, false);
+ IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1];
+ indexBuilderFactories[0][0] = indexBuilderFactory;
IndexCreateOperatorDescriptor secondaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
spec.addRoot(secondaryCreateOp);
runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 1634b2d..a8f2ef1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -242,8 +242,10 @@
pageManagerFactory, null, null);
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, primarySplitProvider, btreeFactory, false);
+ IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1];
+ indexBuilderFactories[0][0] = indexBuilderFactory;
IndexCreateOperatorDescriptor primaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
spec.addRoot(primaryCreateOp);
runTest(spec);
@@ -305,8 +307,10 @@
JobSpecification spec = new JobSpecification();
IndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, secondarySplitProvider, rtreeFactory, false);
+ IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1];
+ indexBuilderFactories[0][0] = indexBuilderFactory;
IndexCreateOperatorDescriptor secondaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
spec.addRoot(secondaryCreateOp);
runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
index 61b600a..32636e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
@@ -31,13 +31,13 @@
public class IndexCreateOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 2L;
- private final IIndexBuilderFactory indexBuilderFactory;
+ private final IIndexBuilderFactory[][] indexBuilderFactories;
private final int[][] partitionsMap;
- public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec, IIndexBuilderFactory indexBuilderFactory,
- int[][] partitionsMap) {
+ public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ IIndexBuilderFactory[][] indexBuilderFactories, int[][] partitionsMap) {
super(spec, 0, 0);
- this.indexBuilderFactory = indexBuilderFactory;
+ this.indexBuilderFactories = indexBuilderFactories;
this.partitionsMap = partitionsMap;
}
@@ -45,9 +45,10 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
int[] storagePartitions = partitionsMap[partition];
+ IIndexBuilderFactory[] partitionIndexBuilderFactories = indexBuilderFactories[partition];
IIndexBuilder[] indexBuilders = new IIndexBuilder[storagePartitions.length];
for (int i = 0; i < storagePartitions.length; i++) {
- indexBuilders[i] = indexBuilderFactory.create(ctx, storagePartitions[i]);
+ indexBuilders[i] = partitionIndexBuilderFactories[i].create(ctx, storagePartitions[i]);
}
return new IndexCreateOperatorNodePushable(indexBuilders);
}