[ASTERIXDB-3318][RUN] Create index builder factory per partition

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
LSMIndexIOOperationCallbackFactory factory is currently shared among
index builder factories. When index builders run concurrently
and without synchronization, this causes an issue because
LSMIndexIOOperationCallbackFactory is stateful.

Change-Id: I4c3b0b982b206ac3eea3653c68e3fecc31145cda
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17976
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 93802c2..43f40eb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -339,7 +339,8 @@
 
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
-        FileSplit[] fs = partitioningProperties.getSplitsProvider().getFileSplits();
+        IFileSplitProvider splitsProvider = partitioningProperties.getSplitsProvider();
+        FileSplit[] fs = splitsProvider.getFileSplits();
         StringBuilder sb = new StringBuilder();
         for (FileSplit f : fs) {
             sb.append(f).append(" ");
@@ -349,19 +350,37 @@
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
         // prepare a LocalResourceMetadata which will be stored in NC's local resource
         // repository
-        IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType,
-                compactionInfo.first, compactionInfo.second);
-        IndexBuilderFactory indexBuilderFactory =
-                new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
-                        partitioningProperties.getSplitsProvider(), resourceFactory, true);
-        IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory,
-                partitioningProperties.getComputeStorageMap());
+        int[][] computeStorageMap = partitioningProperties.getComputeStorageMap();
+        IndexBuilderFactory[][] indexBuilderFactories = getIndexBuilderFactories(dataset, metadataProvider, index,
+                itemType, metaItemType, splitsProvider, compactionInfo.first, compactionInfo.second, computeStorageMap);
+        IndexCreateOperatorDescriptor indexCreateOp =
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, computeStorageMap);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
                 partitioningProperties.getConstraints());
         spec.addRoot(indexCreateOp);
         return spec;
     }
 
+    public static IndexBuilderFactory[][] getIndexBuilderFactories(Dataset dataset, MetadataProvider metadataProvider,
+            Index index, ARecordType itemType, ARecordType metaItemType, IFileSplitProvider fileSplitProvider,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            int[][] computeStorageMap) throws AlgebricksException {
+        IndexBuilderFactory[][] indexBuilderFactories = new IndexBuilderFactory[computeStorageMap.length][];
+        for (int i = 0; i < computeStorageMap.length; i++) {
+            int len = computeStorageMap[i].length;
+            indexBuilderFactories[i] = new IndexBuilderFactory[len];
+            for (int k = 0; k < len; k++) {
+                IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType,
+                        metaItemType, mergePolicyFactory, mergePolicyProperties);
+                IndexBuilderFactory indexBuilderFactory =
+                        new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+                                fileSplitProvider, resourceFactory, true);
+                indexBuilderFactories[i][k] = indexBuilderFactory;
+            }
+        }
+        return indexBuilderFactories;
+    }
+
     public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName,
             MetadataProvider metadataProvider) throws AlgebricksException {
         DataverseName dataverseName = dataverse.getDataverseName();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 3dd84ec..e874663 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -27,7 +27,6 @@
 
 import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.OptimizationConfUtil;
-import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -81,14 +80,12 @@
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
 
@@ -158,13 +155,11 @@
     @Override
     public JobSpecification buildCreationJobSpec() throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
-        IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, sampleIdx, itemType, metaType,
-                mergePolicyFactory, mergePolicyProperties);
-        IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
-                fileSplitProvider, resourceFactory, true);
+        IndexBuilderFactory[][] indexBuilderFactories =
+                DatasetUtil.getIndexBuilderFactories(dataset, metadataProvider, sampleIdx, itemType, metaType,
+                        fileSplitProvider, mergePolicyFactory, mergePolicyProperties, computeStorageMap);
         IndexCreateOperatorDescriptor indexCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, computeStorageMap);
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, computeStorageMap);
         indexCreateOp.setSourceLocation(sourceLoc);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, partitionConstraint);
         spec.addRoot(indexCreateOp);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 0cda625..62352bf 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -24,7 +24,6 @@
 import java.util.Set;
 
 import org.apache.asterix.common.cluster.PartitioningProperties;
-import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
@@ -34,14 +33,12 @@
 import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
-import org.apache.hyracks.storage.common.IResourceFactory;
 
 public abstract class SecondaryTreeIndexOperationsHelper extends SecondaryIndexOperationsHelper {
 
@@ -53,14 +50,13 @@
     @Override
     public JobSpecification buildCreationJobSpec() throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
-        IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaType,
-                mergePolicyFactory, mergePolicyProperties);
-        IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
-                secondaryFileSplitProvider, resourceFactory, true);
         PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
-        IndexCreateOperatorDescriptor secondaryIndexCreateOp = new IndexCreateOperatorDescriptor(spec,
-                indexBuilderFactory, partitioningProperties.getComputeStorageMap());
+        int[][] computeStorageMap = partitioningProperties.getComputeStorageMap();
+        IndexBuilderFactory[][] indexBuilderFactories =
+                DatasetUtil.getIndexBuilderFactories(dataset, metadataProvider, index, itemType, metaType,
+                        secondaryFileSplitProvider, mergePolicyFactory, mergePolicyProperties, computeStorageMap);
+        IndexCreateOperatorDescriptor secondaryIndexCreateOp =
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, computeStorageMap);
         secondaryIndexCreateOp.setSourceLocation(sourceLoc);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
                 secondaryPartitionConstraint);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index d90a212..4948a66 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -120,8 +120,10 @@
         IResourceFactory primaryResourceFactory = createPrimaryResourceFactory();
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, primarySplitProvider, primaryResourceFactory, false);
+        IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1];
+        indexBuilderFactories[0][0] = indexBuilderFactory;
         IndexCreateOperatorDescriptor primaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
         spec.addRoot(primaryCreateOp);
         runTest(spec);
@@ -168,8 +170,10 @@
         IResourceFactory secondaryResourceFactory = createSecondaryResourceFactory();
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, secondarySplitProvider, secondaryResourceFactory, false);
+        IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1];
+        indexBuilderFactories[0][0] = indexBuilderFactory;
         IndexCreateOperatorDescriptor secondaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
         spec.addRoot(secondaryCreateOp);
         runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 1634b2d..a8f2ef1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -242,8 +242,10 @@
                 pageManagerFactory, null, null);
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, primarySplitProvider, btreeFactory, false);
+        IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1];
+        indexBuilderFactories[0][0] = indexBuilderFactory;
         IndexCreateOperatorDescriptor primaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
         spec.addRoot(primaryCreateOp);
         runTest(spec);
@@ -305,8 +307,10 @@
         JobSpecification spec = new JobSpecification();
         IndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, secondarySplitProvider, rtreeFactory, false);
+        IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1];
+        indexBuilderFactories[0][0] = indexBuilderFactory;
         IndexCreateOperatorDescriptor secondaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
         spec.addRoot(secondaryCreateOp);
         runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
index 61b600a..32636e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
@@ -31,13 +31,13 @@
 public class IndexCreateOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 2L;
-    private final IIndexBuilderFactory indexBuilderFactory;
+    private final IIndexBuilderFactory[][] indexBuilderFactories;
     private final int[][] partitionsMap;
 
-    public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec, IIndexBuilderFactory indexBuilderFactory,
-            int[][] partitionsMap) {
+    public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            IIndexBuilderFactory[][] indexBuilderFactories, int[][] partitionsMap) {
         super(spec, 0, 0);
-        this.indexBuilderFactory = indexBuilderFactory;
+        this.indexBuilderFactories = indexBuilderFactories;
         this.partitionsMap = partitionsMap;
     }
 
@@ -45,9 +45,10 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         int[] storagePartitions = partitionsMap[partition];
+        IIndexBuilderFactory[] partitionIndexBuilderFactories = indexBuilderFactories[partition];
         IIndexBuilder[] indexBuilders = new IIndexBuilder[storagePartitions.length];
         for (int i = 0; i < storagePartitions.length; i++) {
-            indexBuilders[i] = indexBuilderFactory.create(ctx, storagePartitions[i]);
+            indexBuilders[i] = partitionIndexBuilderFactories[i].create(ctx, storagePartitions[i]);
         }
         return new IndexCreateOperatorNodePushable(indexBuilders);
     }