[ASTERIXDB-3144][RT] Run correlated index bulkload at storage parallelism

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- To ensure correctness, always run correlated secondary index
  bulkload jobs using storage parallelism regardless of the number
  of compute partitions. This will ensure that each storage partition
  will produce the corresponding secondary index components.

Change-Id: I5f38e4b06bcd91479bae544619bf3a96dde3c500
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17512
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 130e39c..e882129 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -585,7 +585,20 @@
             MetadataProvider metadataProvider) throws AlgebricksException {
         PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
         AlgebricksPartitionConstraint primaryPartitionConstraint = partitioningProperties.getConstraints();
+        IOperatorDescriptor dummyKeyProviderOp = createDummyKeyProviderOp(spec);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, dummyKeyProviderOp,
+                primaryPartitionConstraint);
+        return dummyKeyProviderOp;
+    }
 
+    public static IOperatorDescriptor createCorrelatedDummyKeyProviderOp(JobSpecification spec,
+            AlgebricksPartitionConstraint apc) throws AlgebricksException {
+        IOperatorDescriptor dummyKeyProviderOp = createDummyKeyProviderOp(spec);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, dummyKeyProviderOp, apc);
+        return dummyKeyProviderOp;
+    }
+
+    private static IOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AlgebricksException {
         // Build dummy tuple containing one field with a dummy value inside.
         ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
         DataOutput dos = tb.getDataOutput();
@@ -602,8 +615,6 @@
         RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
         ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
-                primaryPartitionConstraint);
         return keyProviderOp;
     }
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
index c596137..2b948ef 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
@@ -75,7 +75,8 @@
         IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
-        IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+        IOperatorDescriptor keyProviderOp =
+                DatasetUtil.createCorrelatedDummyKeyProviderOp(spec, primaryPartitionConstraint);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
index 00cc595..cd3f01c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
@@ -216,7 +216,8 @@
         IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
-        IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+        IOperatorDescriptor keyProviderOp =
+                DatasetUtil.createCorrelatedDummyKeyProviderOp(spec, primaryPartitionConstraint);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
index e26aab3..302ad74 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
@@ -188,7 +188,8 @@
         assert dataset.getDatasetType() == DatasetType.INTERNAL;
 
         // Create dummy key provider for feeding the primary index scan.
-        IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+        IOperatorDescriptor keyProviderOp =
+                DatasetUtil.createCorrelatedDummyKeyProviderOp(spec, primaryPartitionConstraint);
         IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create primary index scan op.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 5d6c13c..1a58423 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.common.config.OptimizationConfUtil;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -74,6 +75,7 @@
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -211,8 +213,9 @@
         payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
         metaSerde =
                 metaType == null ? null : SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
-        PartitioningProperties partitioningProperties =
-                metadataProvider.getPartitioningProperties(dataset, index.getIndexName());
+        PartitioningProperties partitioningProperties;
+        partitioningProperties =
+                getSecondaryIndexBulkloadPartitioningProperties(metadataProvider, dataset, index.getIndexName());
         secondaryFileSplitProvider = partitioningProperties.getSpiltsProvider();
         secondaryPartitionConstraint = partitioningProperties.getConstraints();
         numPrimaryKeys = dataset.getPrimaryKeys().size();
@@ -223,8 +226,8 @@
             } else {
                 numFilterFields = 0;
             }
-
-            PartitioningProperties datasetPartitioningProperties = metadataProvider.getPartitioningProperties(dataset);
+            PartitioningProperties datasetPartitioningProperties = getSecondaryIndexBulkloadPartitioningProperties(
+                    metadataProvider, dataset, dataset.getDatasetName());
             primaryFileSplitProvider = datasetPartitioningProperties.getSpiltsProvider();
             primaryPartitionConstraint = datasetPartitioningProperties.getConstraints();
             setPrimaryRecDescAndComparators();
@@ -527,4 +530,20 @@
     public AlgebricksPartitionConstraint getSecondaryPartitionConstraint() {
         return secondaryPartitionConstraint;
     }
+
+    private PartitioningProperties getSecondaryIndexBulkloadPartitioningProperties(MetadataProvider mp, Dataset dataset,
+            String indexName) throws AlgebricksException {
+        PartitioningProperties partitioningProperties = mp.getPartitioningProperties(dataset, indexName);
+        // special case for bulkloading secondary indexes for datasets with correldated merge policy
+        // to ensure correctness, we will run in as many locations as storage partitions
+        // this will not be needed once ASTERIXDB-3176 is implemented
+        if (this instanceof SecondaryCorrelatedTreeIndexOperationsHelper) {
+            FileSplit[] fileSplits = partitioningProperties.getSpiltsProvider().getFileSplits();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sp =
+                    StoragePathUtil.splitProviderAndPartitionConstraints(fileSplits);
+            return PartitioningProperties.of(sp.getFirst(), sp.getSecond(),
+                    DataPartitioningProvider.getOneToOnePartitionsMap(fileSplits.length));
+        }
+        return partitioningProperties;
+    }
 }