[ASTERIXDB-3144][HYR][RT] Make index bulkload support multiple partitions
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This patch changes the index bulkload operators to support
operating on multiple partitions. With this change, an index
bulkload node pushable will bulkload multiple indexes
representing multiple partitions. This is a step towards
achieving compute/storage separation.
Change-Id: I36e242b36ed1ea2472883e8d3c1ec99d1de1c630
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17494
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 068c0ee..3ddbfd9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -213,10 +213,15 @@
for (int i = 0; i < fieldPermutation.length; i++) {
fieldPermutation[i] = i;
}
- LSMIndexBulkLoadOperatorNodePushable op =
- new LSMIndexBulkLoadOperatorNodePushable(secondaryIndexHelperFactory, primaryIndexHelperFactory,
- ctx, 0, fieldPermutation, 1.0F, false, numElementsHint, true, secondaryIndexInfo.rDesc,
- BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null);
+ int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
+ int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+ IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
+ ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
+ primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
+ LSMIndexBulkLoadOperatorNodePushable op = new LSMIndexBulkLoadOperatorNodePushable(
+ secondaryIndexHelperFactory, primaryIndexHelperFactory, ctx, 0, fieldPermutation, 1.0F, false,
+ numElementsHint, true, secondaryIndexInfo.rDesc, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(),
+ null, tuplePartitionerFactory, partitionsMap);
op.setOutputFrameWriter(0, new SinkRuntimeFactory().createPushRuntime(ctx)[0], null);
return Pair.of(secondaryIndexInfo, op);
} catch (Throwable th) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 3d3d1f2..06380fe 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -233,12 +233,4 @@
Assert.assertTrue(nodeFileSplit.isPresent());
return nodeFileSplit.get().getPath();
}
-
- public static int[][] getPartitionsMap(int numPartitions) {
- int[][] map = new int[numPartitions][1];
- for (int i = 0; i < numPartitions; i++) {
- map[i] = new int[] { i };
- }
- return map;
- }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
index add708e..e1092dd 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -27,7 +27,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.asterix.app.bootstrap.TestNodeController;
-import org.apache.asterix.common.TestDataUtil;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.metadata.DataverseName;
@@ -58,6 +57,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -176,7 +176,7 @@
dataflowHelper.open();
// try to drop in-use index (should fail)
IndexDropOperatorNodePushable dropInUseOp = new IndexDropOperatorNodePushable(helperFactory,
- EnumSet.noneOf(DropOption.class), ctx, 0, TestDataUtil.getPartitionsMap(1));
+ EnumSet.noneOf(DropOption.class), ctx, 0, TestUtils.getPartitionsMap(1));
try {
dropInUseOp.initialize();
} catch (HyracksDataException e) {
@@ -192,7 +192,7 @@
dropFailed.set(false);
// drop with option wait for in-use should be successful once the index is closed
final IndexDropOperatorNodePushable dropWithWaitOp = new IndexDropOperatorNodePushable(helperFactory,
- EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0, TestDataUtil.getPartitionsMap(1));
+ EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0, TestUtils.getPartitionsMap(1));
Thread dropThread = new Thread(() -> {
try {
dropWithWaitOp.initialize();
@@ -216,7 +216,7 @@
dropFailed.set(false);
// Dropping non-existing index
IndexDropOperatorNodePushable dropNonExistingOp = new IndexDropOperatorNodePushable(helperFactory,
- EnumSet.noneOf(DropOption.class), ctx, 0, TestDataUtil.getPartitionsMap(1));
+ EnumSet.noneOf(DropOption.class), ctx, 0, TestUtils.getPartitionsMap(1));
try {
dropNonExistingOp.initialize();
} catch (HyracksDataException e) {
@@ -232,7 +232,7 @@
// Dropping non-existing index with if exists option should be successful
dropFailed.set(false);
IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp = new IndexDropOperatorNodePushable(helperFactory,
- EnumSet.of(DropOption.IF_EXISTS), ctx, 0, TestDataUtil.getPartitionsMap(1));
+ EnumSet.of(DropOption.IF_EXISTS), ctx, 0, TestUtils.getPartitionsMap(1));
try {
dropNonExistingWithIfExistsOp.initialize();
} catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 7383ca3..97d5c39 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,6 +26,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -740,10 +741,12 @@
// move key fields to front
int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
+ int[] pkFields = new int[numKeys];
int i = 0;
for (LogicalVariable varKey : keys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
+ pkFields[i] = idx;
i++;
}
fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
@@ -760,11 +763,17 @@
// right callback
// (ex. what's the expected behavior when there is an error during
// bulkload?)
+ int[][] partitionsMap = getPartitionsMap(dataset);
+ int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+ IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
+ ITuplePartitionerFactory partitionerFactory =
+ new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
IIndexDataflowHelperFactory indexHelperFactory =
new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
- LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null,
- fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
- indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null);
+ LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad =
+ new LSMIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation,
+ StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, indexHelperFactory,
+ null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory, partitionsMap);
return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
}
@@ -1086,7 +1095,7 @@
int numPartitions = getNumPartitions(splitsAndConstraint.second);
int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory tuplePartitionerFactory =
+ ITuplePartitionerFactory partitionerFactory =
new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
IOperatorDescriptor op;
@@ -1094,7 +1103,7 @@
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null,
- BulkLoadUsage.LOAD, dataset.getDatasetId(), null);
+ BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory, partitionsMap);
} else {
if (indexOp == IndexOperation.INSERT) {
ISearchOperationCallbackFactory searchCallbackFactory = dataset
@@ -1102,7 +1111,7 @@
Optional<Index> primaryKeyIndex = MetadataManager.INSTANCE
.getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName()).stream()
- .filter(index -> index.isPrimaryKeyIndex()).findFirst();
+ .filter(Index::isPrimaryKeyIndex).findFirst();
IIndexDataflowHelperFactory pkidfh = null;
if (primaryKeyIndex.isPresent()) {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primaryKeySplitsAndConstraint =
@@ -1111,12 +1120,12 @@
primaryKeySplitsAndConstraint.first);
}
op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
- modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields,
- tuplePartitionerFactory, partitionsMap);
+ modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, partitionerFactory,
+ partitionsMap);
} else {
op = createLSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
- null, true, modificationCallbackFactory, tuplePartitionerFactory, partitionsMap);
+ null, true, modificationCallbackFactory, partitionerFactory, partitionsMap);
}
}
return new Pair<>(op, splitsAndConstraint.second);
@@ -1288,7 +1297,7 @@
int numPartitions = getNumPartitions(splitsAndConstraint.second);
int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory tuplePartitionerFactory =
+ ITuplePartitionerFactory partitionerFactory =
new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
IOperatorDescriptor op;
@@ -1296,15 +1305,15 @@
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null,
- BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
+ BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory, partitionsMap);
} else if (indexOp == IndexOperation.UPSERT) {
int operationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
filterFactory, prevFilterFactory, modificationCallbackFactory, operationFieldIndex,
- BinaryIntegerInspector.FACTORY, prevFieldPermutation, tuplePartitionerFactory, partitionsMap);
+ BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory, partitionsMap);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
- filterFactory, false, modificationCallbackFactory, tuplePartitionerFactory, partitionsMap);
+ filterFactory, false, modificationCallbackFactory, partitionerFactory, partitionsMap);
}
return new Pair<>(op, splitsAndConstraint.second);
} catch (Exception e) {
@@ -1465,7 +1474,7 @@
int numPartitions = getNumPartitions(splitsAndConstraint.second);
int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory tuplePartitionerFactory =
+ ITuplePartitionerFactory partitionerFactory =
new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
IOperatorDescriptor op;
@@ -1473,17 +1482,18 @@
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false,
- indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
+ indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory,
+ partitionerFactory, partitionsMap);
} else if (indexOp == IndexOperation.UPSERT) {
int operationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
indexDataflowHelperFactory, filterFactory, prevFilterFactory, modificationCallbackFactory,
- operationFieldIndex, BinaryIntegerInspector.FACTORY, prevFieldPermutation, tuplePartitionerFactory,
+ operationFieldIndex, BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory,
partitionsMap);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
- indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory,
- tuplePartitionerFactory, partitionsMap);
+ indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory, partitionerFactory,
+ partitionsMap);
}
return new Pair<>(op, splitsAndConstraint.second);
}
@@ -1585,7 +1595,7 @@
int numPartitions = getNumPartitions(splitsAndConstraint.second);
int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory tuplePartitionerFactory =
+ ITuplePartitionerFactory partitionerFactory =
new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
IOperatorDescriptor op;
@@ -1593,16 +1603,17 @@
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory,
- null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
+ null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory,
+ partitionsMap);
} else if (indexOp == IndexOperation.UPSERT) {
int upsertOperationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
filterFactory, prevFilterFactory, modificationCallbackFactory, upsertOperationFieldIndex,
- BinaryIntegerInspector.FACTORY, prevFieldPermutation, tuplePartitionerFactory, partitionsMap);
+ BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory, partitionsMap);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
- indexDataFlowFactory, filterFactory, false, modificationCallbackFactory,
- tuplePartitionerFactory, partitionsMap);
+ indexDataFlowFactory, filterFactory, false, modificationCallbackFactory, partitionerFactory,
+ partitionsMap);
}
return new Pair<>(op, splitsAndConstraint.second);
} catch (Exception e) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 6daceb7..89dea40 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -412,10 +412,11 @@
IRecoveryManager.ResourceType.LSM_BTREE);
IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
+ int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec,
dataset.getPrimaryRecordDescriptor(metadataProvider), lowKeyFields, highKeyFields, true, true,
indexHelperFactory, false, false, null, searchCallbackFactory, null, null, false, null, null, -1, false,
- null, null, projectorFactory, null, null);
+ null, null, projectorFactory, null, partitionsMap);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
primaryPartitionConstraint);
return primarySearchOp;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 161e86e..43b1fb9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -19,6 +19,7 @@
package org.apache.asterix.metadata.utils;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -65,12 +66,15 @@
import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
@@ -313,10 +317,20 @@
protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor,
- long numElementHint) {
+ long numElementHint) throws AlgebricksException {
+ int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
+ int[] pkFields = new int[dataset.getPrimaryKeys().size()];
+ for (int i = 0; i < pkFields.length; i++) {
+ pkFields[i] = fieldPermutation[i];
+ }
+ int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+ IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider);
+ ITuplePartitionerFactory partitionerFactory =
+ new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
recordDesc, fieldPermutation, fillFactor, false, numElementHint, true, dataflowHelperFactory, null,
- LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage.LOAD, dataset.getDatasetId(), null);
+ LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
+ partitionsMap);
treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
partitionConstraint);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryArrayIndexBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryArrayIndexBTreeOperationsHelper.java
index a6dc65d..5b4fac8 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryArrayIndexBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryArrayIndexBTreeOperationsHelper.java
@@ -330,8 +330,10 @@
// Apply the bulk loading operator.
IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), secondaryFileSplitProvider);
- targetOp = createTreeIndexBulkLoadOp(spec, createFieldPermutationForBulkLoadOp(numTotalSecondaryKeys),
- dataflowHelperFactory, StorageConstants.DEFAULT_TREE_FILL_FACTOR);
+ int[] fieldPermutations = createFieldPermutationForBulkLoadOp(numTotalSecondaryKeys);
+ int[] pkFields = createPkFieldPermutationForBulkLoadOp(fieldPermutations, numTotalSecondaryKeys);
+ targetOp = createTreeIndexBulkLoadOp(spec, fieldPermutations, dataflowHelperFactory,
+ StorageConstants.DEFAULT_TREE_FILL_FACTOR, pkFields);
spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
// Apply the sink.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 01d1c38..1889aa1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -62,14 +62,16 @@
@Override
public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- Index.ValueIndexDetails indexDetails = (Index.ValueIndexDetails) index.getIndexDetails();
- int[] fieldPermutation = createFieldPermutationForBulkLoadOp(indexDetails.getKeyFieldNames().size());
- IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
- metadataProvider.getStorageComponentProvider().getStorageManager(), secondaryFileSplitProvider);
- boolean excludeUnknown = excludeUnknownKeys(index, indexDetails, anySecondaryKeyIsNullable);
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return spec;
} else {
+ Index.ValueIndexDetails indexDetails = (Index.ValueIndexDetails) index.getIndexDetails();
+ int numSecondaryKeys = getNumSecondaryKeys();
+ int[] fieldPermutation = createFieldPermutationForBulkLoadOp(numSecondaryKeys);
+ int[] pkFields = createPkFieldPermutationForBulkLoadOp(fieldPermutation, numSecondaryKeys);
+ IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
+ metadataProvider.getStorageComponentProvider().getStorageManager(), secondaryFileSplitProvider);
+ boolean excludeUnknown = excludeUnknownKeys(index, indexDetails, anySecondaryKeyIsNullable);
// job spec:
// key provider -> primary idx scan -> cast assign -> (select)? -> (sort)? -> bulk load -> sink
IndexUtil.bindJobEventListener(spec, metadataProvider);
@@ -86,15 +88,14 @@
sourceOp = targetOp;
// primary index ----> cast assign op (produces the secondary index entry)
- targetOp = createAssignOp(spec, indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
+ targetOp = createAssignOp(spec, numSecondaryKeys, secondaryRecDesc);
spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
sourceOp = targetOp;
if (excludeUnknown) {
// if any of the secondary fields are nullable, then add a select op that filters nulls.
// assign op ----> select op
- targetOp =
- createFilterAllUnknownsSelectOp(spec, indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
+ targetOp = createFilterAllUnknownsSelectOp(spec, numSecondaryKeys, secondaryRecDesc);
spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
sourceOp = targetOp;
}
@@ -108,7 +109,7 @@
// cast assign op OR select op OR sort op ----> bulk load op
targetOp = createTreeIndexBulkLoadOp(spec, fieldPermutation, dataflowHelperFactory,
- StorageConstants.DEFAULT_TREE_FILL_FACTOR);
+ StorageConstants.DEFAULT_TREE_FILL_FACTOR, pkFields);
spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
// bulk load op ----> sink op
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 76c2a73..02c05a9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,6 +19,7 @@
package org.apache.asterix.metadata.utils;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -67,11 +68,14 @@
import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -439,13 +443,20 @@
}
protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
- int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) {
+ int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor, int[] pkFields)
+ throws AlgebricksException {
IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
+ int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
+ int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+ IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider);
+ ITuplePartitionerFactory partitionerFactory =
+ new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
// when an index is being created (not loaded) the filtration is introduced in the pipeline -> no tuple filter
- LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
- secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
- primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null);
+ LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp =
+ new LSMIndexBulkLoadOperatorDescriptor(spec, secondaryRecDesc, fieldPermutation, fillFactor, false,
+ numElementsHint, false, dataflowHelperFactory, primaryIndexDataflowHelperFactory,
+ BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null, partitionerFactory, partitionsMap);
treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
secondaryPartitionConstraint);
@@ -498,27 +509,6 @@
return asterixSelectOp;
}
- protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec, int numSecondaryKeys,
- RecordDescriptor secondaryRecDesc) {
- int[] outColumns = new int[numSecondaryKeys];
- int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeys; i++) {
- outColumns[i] = i + numPrimaryKeys + 1;
- projectionList[i] = i + numPrimaryKeys + 1;
- }
-
- IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
- System.arraycopy(secondaryFieldAccessEvalFactories, 0, sefs, 0, secondaryFieldAccessEvalFactories.length);
- //add External RIDs to the projection list
- for (int i = 0; i < numPrimaryKeys; i++) {
- projectionList[numSecondaryKeys + i] = i + 1;
- }
-
- AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
- return new AlgebricksMetaOperatorDescriptor(spec, 1, 1, new IPushRuntimeFactory[] { assign },
- new RecordDescriptor[] { secondaryRecDesc });
- }
-
@Override
public RecordDescriptor getSecondaryRecDesc() {
return secondaryRecDesc;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
index a84454f..fa55105 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -169,7 +169,8 @@
enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
// For tokenization, sorting and loading.
// One token (+ optional partitioning field) + primary keys.
- numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys;
+ int pkOff = getNumTokens();
+ numTokenKeyPairFields = pkOff + numPrimaryKeys;
ISerializerDeserializer[] tokenKeyPairFields =
new ISerializerDeserializer[numTokenKeyPairFields + numFilterFields];
ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
@@ -177,12 +178,10 @@
tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
tokenKeyPairTypeTraits[0] = tokenTypeTraits[0];
tokenKeyPairComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
- int pkOff = 1;
if (isPartitioned) {
tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE;
tokenKeyPairTypeTraits[1] = tokenTypeTraits[1];
tokenKeyPairComparatorFactories[1] = ShortBinaryComparatorFactory.INSTANCE;
- pkOff = 2;
}
if (numPrimaryKeys > 0) {
tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0];
@@ -303,14 +302,25 @@
return sortOp;
}
- private AbstractSingleActivityOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec) {
+ private AbstractSingleActivityOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec)
+ throws AlgebricksException {
int[] fieldPermutation = new int[numTokenKeyPairFields + numFilterFields];
for (int i = 0; i < fieldPermutation.length; i++) {
fieldPermutation[i] = i;
}
+ // how can numPrimaryKeys be 0?
+ int[] pkFields = new int[numPrimaryKeys];
+ int pkOffset = getNumTokens();
+ for (int i = 0; i < pkFields.length; i++) {
+ pkFields[i] = fieldPermutation[pkOffset + i];
+ }
IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), secondaryFileSplitProvider);
return createTreeIndexBulkLoadOp(spec, fieldPermutation, dataflowHelperFactory,
- StorageConstants.DEFAULT_TREE_FILL_FACTOR);
+ StorageConstants.DEFAULT_TREE_FILL_FACTOR, pkFields);
+ }
+
+ private int getNumTokens() {
+ return isPartitioned ? 2 : 1;
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
index 792d495..1c20eff 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
@@ -192,6 +192,7 @@
***************************************************/
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
int[] fieldPermutation = createFieldPermutationForBulkLoadOp(numNestedSecondaryKeyFields);
+ int[] pkFields = createPkFieldPermutationForBulkLoadOp(fieldPermutation, numNestedSecondaryKeyFields);
int numNestedSecondaryKeFieldsConsideringPointMBR =
isPointMBR ? numNestedSecondaryKeyFields / 2 : numNestedSecondaryKeyFields;
RecordDescriptor secondaryRecDescConsideringPointMBR =
@@ -230,7 +231,7 @@
isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc);
// Create secondary RTree bulk load op.
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation,
- indexDataflowHelperFactory, StorageConstants.DEFAULT_TREE_FILL_FACTOR);
+ indexDataflowHelperFactory, StorageConstants.DEFAULT_TREE_FILL_FACTOR, pkFields);
SinkRuntimeFactory sinkRuntimeFactory = new SinkRuntimeFactory();
sinkRuntimeFactory.setSourceLocation(sourceLoc);
AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 14c9dda..4fda6e4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -110,4 +110,12 @@
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
}
+
+ protected int[] createPkFieldPermutationForBulkLoadOp(int[] fieldsPermutation, int numSecondaryKeyFields) {
+ int[] pkFieldPermutation = new int[numPrimaryKeys];
+ for (int i = 0; i < pkFieldPermutation.length; i++) {
+ pkFieldPermutation[i] = fieldsPermutation[numSecondaryKeyFields + i];
+ }
+ return pkFieldPermutation;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
index ea84ca3..6edb949 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -31,7 +32,7 @@
public class LSMIndexBulkLoadOperatorDescriptor extends TreeIndexBulkLoadOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
public enum BulkLoadUsage {
LOAD,
@@ -44,19 +45,17 @@
protected final int datasetId;
- protected final ITupleFilterFactory tupleFilterFactory;
-
public LSMIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory,
IIndexDataflowHelperFactory primaryIndexHelperFactory, BulkLoadUsage usage, int datasetId,
- ITupleFilterFactory tupleFilterFactory) {
+ ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory partitionerFactory,
+ int[][] partitionsMap) {
super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
- indexHelperFactory);
+ indexHelperFactory, tupleFilterFactory, partitionerFactory, partitionsMap);
this.primaryIndexHelperFactory = primaryIndexHelperFactory;
this.usage = usage;
this.datasetId = datasetId;
- this.tupleFilterFactory = tupleFilterFactory;
}
@Override
@@ -65,6 +64,6 @@
return new LSMIndexBulkLoadOperatorNodePushable(indexHelperFactory, primaryIndexHelperFactory, ctx, partition,
fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage, datasetId,
- tupleFilterFactory);
+ tupleFilterFactory, partitionerFactory, partitionsMap);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
index 52e3b2f..367f670 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
@@ -38,29 +39,35 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
+import org.apache.hyracks.storage.common.IIndex;
public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
- protected final BulkLoadUsage usage;
- protected final IIndexDataflowHelper primaryIndexHelper;
+ protected final BulkLoadUsage usage;
+ protected final IIndexDataflowHelper[] primaryIndexHelpers;
protected final IDatasetLifecycleManager datasetManager;
protected final int datasetId;
protected final int partition;
- protected ILSMIndex primaryIndex;
+ protected ILSMIndex[] primaryIndexes;
public LSMIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory,
IIndexDataflowHelperFactory priamryIndexDataflowHelperFactory, IHyracksTaskContext ctx, int partition,
int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage usage, int datasetId,
- ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
+ ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory partitionerFactory, int[][] partitionsMap)
+ throws HyracksDataException {
super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint,
- checkIfEmptyIndex, recDesc, tupleFilterFactory);
+ checkIfEmptyIndex, recDesc, tupleFilterFactory, partitionerFactory, partitionsMap);
if (priamryIndexDataflowHelperFactory != null) {
- this.primaryIndexHelper =
- priamryIndexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+ primaryIndexHelpers = new IIndexDataflowHelper[partitions.length];
+ primaryIndexes = new ILSMIndex[partitions.length];
+ for (int i = 0; i < partitions.length; i++) {
+ primaryIndexHelpers[i] = priamryIndexDataflowHelperFactory
+ .create(ctx.getJobletContext().getServiceContext(), partitions[i]);
+ }
} else {
- this.primaryIndexHelper = null;
+ primaryIndexHelpers = null;
}
this.usage = usage;
this.datasetId = datasetId;
@@ -71,17 +78,17 @@
}
@Override
- protected void initializeBulkLoader() throws HyracksDataException {
+ protected void initializeBulkLoader(IIndex index, int indexId) throws HyracksDataException {
ILSMIndex targetIndex = (ILSMIndex) index;
Map<String, Object> parameters = new HashMap<>();
parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, LSMComponentId.DEFAULT_COMPONENT_ID);
if (usage.equals(BulkLoadUsage.LOAD)) {
- bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
- parameters);
+ bulkLoaders[indexId] = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint,
+ checkIfEmptyIndex, parameters);
} else {
- primaryIndexHelper.open();
- primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
- List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
+ primaryIndexHelpers[indexId].open();
+ primaryIndexes[indexId] = (ILSMIndex) primaryIndexHelpers[indexId].getIndexInstance();
+ List<ILSMDiskComponent> primaryComponents = primaryIndexes[indexId].getDiskComponents();
if (!primaryComponents.isEmpty()) {
ILSMComponentId bulkloadId = LSMComponentIdUtils.union(primaryComponents.get(0).getId(),
primaryComponents.get(primaryComponents.size() - 1).getId());
@@ -90,8 +97,8 @@
parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID,
LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID);
}
- bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
- parameters);
+ bulkLoaders[indexId] = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint,
+ checkIfEmptyIndex, parameters);
}
}
@@ -101,8 +108,8 @@
try {
super.close();
} finally {
- if (primaryIndex != null) {
- primaryIndexHelper.close();
+ if (primaryIndexHelpers != null) {
+ closeIndexes(primaryIndexes, primaryIndexHelpers);
}
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index 2b838d3..f5ec1eb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -178,12 +178,4 @@
public boolean expensiveThanMaterialization() {
return false;
}
-
- public static int[][] getPartitionsMap(int numPartitions) {
- int[][] map = new int[numPartitions][1];
- for (int i = 0; i < numPartitions; i++) {
- map[i] = new int[] { i };
- }
- return map;
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
index cc922ff..ccd563a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -150,7 +150,7 @@
IIndexDataflowHelperFactory primaryHelperFactory =
new IndexDataflowHelperFactory(storageManager, primarySplitProvider);
- int[][] partitionsMap = getPartitionsMap(splitNCs.length);
+ int[][] partitionsMap = JobHelper.getPartitionsMap(splitNCs.length);
int[] pkFields = new int[] { primaryFieldPermutation[0] };
IBinaryHashFunctionFactory[] pkHashFunFactories =
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) };
@@ -216,12 +216,4 @@
return spec;
}
-
- public static int[][] getPartitionsMap(int numPartitions) {
- int[][] map = new int[numPartitions][1];
- for (int i = 0; i < numPartitions; i++) {
- map[i] = new int[] { i };
- }
- return map;
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/JobHelper.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/JobHelper.java
index 4b219e8..7b4a885 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/JobHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/JobHelper.java
@@ -40,4 +40,12 @@
public static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, String[] splitNCs) {
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, splitNCs);
}
+
+ public static int[][] getPartitionsMap(int numPartitions) {
+ int[][] map = new int[numPartitions][1];
+ for (int i = 0; i < numPartitions; i++) {
+ map[i] = new int[] { i };
+ }
+ return map;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 0eaa083..1495e60 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobId;
@@ -35,6 +36,7 @@
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -147,8 +149,15 @@
IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
IIndexDataflowHelperFactory dataflowHelperFactory =
new IndexDataflowHelperFactory(storageManager, btreeSplitProvider);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, recDesc,
- fieldPermutation, 0.7f, false, 1000L, true, dataflowHelperFactory);
+ int[][] partitionsMap = JobHelper.getPartitionsMap(splitNCs.length);
+ int[] pkFields = new int[] { fieldPermutation[0] };
+ IBinaryHashFunctionFactory[] pkHashFunFactories =
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) };
+ ITuplePartitionerFactory tuplePartitionerFactory =
+ new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, splitNCs.length);
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
+ new TreeIndexBulkLoadOperatorDescriptor(spec, recDesc, fieldPermutation, 0.7f, false, 1000L, true,
+ dataflowHelperFactory, null, tuplePartitionerFactory, partitionsMap);
JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index cdb8230..94018b1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -20,17 +20,21 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.IntegerBinaryComparatorFactory;
+import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
@@ -143,8 +147,15 @@
IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
IIndexDataflowHelperFactory secondaryHelperFactory =
new IndexDataflowHelperFactory(storageManager, btreeSplitProvider);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, null,
- fieldPermutation, 0.7f, false, 1000L, true, secondaryHelperFactory);
+ int[][] partitionsMap = JobHelper.getPartitionsMap(splitNCs.length);
+ int[] pkFields = new int[] { fieldPermutation[1] };
+ IBinaryHashFunctionFactory[] pkHashFunFactories =
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) };
+ ITuplePartitionerFactory tuplePartitionerFactory =
+ new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, splitNCs.length);
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
+ new TreeIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation, 0.7f, false, 1000L, true,
+ secondaryHelperFactory, null, tuplePartitionerFactory, partitionsMap);
JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
JobHelper.createPartitionConstraint(spec, nsOpDesc, splitNCs);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 86525f1..1776b09 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -33,6 +33,7 @@
import java.io.DataOutput;
import java.io.File;
+import java.util.Arrays;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -71,10 +72,10 @@
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.test.support.TestStorageManager;
import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
import org.apache.hyracks.tests.am.common.TreeOperatorTestHelper;
import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
-import org.apache.hyracks.tests.integration.TestUtil;
import org.junit.After;
import org.junit.Before;
@@ -120,7 +121,7 @@
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, primarySplitProvider, primaryResourceFactory, false);
IndexCreateOperatorDescriptor primaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
spec.addRoot(primaryCreateOp);
runTest(spec);
@@ -143,8 +144,12 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
- TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- primaryRecDesc, fieldPermutation, 0.7f, true, 1000L, true, primaryHelperFactory);
+ int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+ ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
+ primaryHashFunFactories, ordersSplits.length);
+ TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad =
+ new TreeIndexBulkLoadOperatorDescriptor(spec, primaryRecDesc, fieldPermutation, 0.7f, true, 1000L, true,
+ primaryHelperFactory, null, tuplePartitionerFactory, partitionsMap);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
@@ -164,7 +169,7 @@
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, secondarySplitProvider, secondaryResourceFactory, false);
IndexCreateOperatorDescriptor secondaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
spec.addRoot(secondaryCreateOp);
runTest(spec);
@@ -205,8 +210,13 @@
// load secondary index
int[] fieldPermutation = { 3, 0 };
- TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- secondaryRecDesc, fieldPermutation, 0.7f, true, 1000L, true, secondaryHelperFactory);
+ int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+ int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+ ITuplePartitionerFactory tuplePartitionerFactory2 =
+ new FieldHashPartitionerFactory(secondaryPKFieldPermutationB, primaryHashFunFactories, numPartitions);
+ TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad =
+ new TreeIndexBulkLoadOperatorDescriptor(spec, secondaryRecDesc, fieldPermutation, 0.7f, true, 1000L,
+ true, secondaryHelperFactory, null, tuplePartitionerFactory2, partitionsMap);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
@@ -234,7 +244,7 @@
new DelimitedDataTupleParserFactory(inputParserFactories, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
- int[][] partitionsMap = TestUtil.getPartitionsMap(ordersSplits.length);
+ int[][] partitionsMap = TestUtils.getPartitionsMap(ordersSplits.length);
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
primaryHashFunFactories, ordersSplits.length);
@@ -272,7 +282,7 @@
protected void destroyPrimaryIndex() throws Exception {
JobSpecification spec = new JobSpecification();
IndexDropOperatorDescriptor primaryDropOp =
- new IndexDropOperatorDescriptor(spec, primaryHelperFactory, getPartitionsMap(1));
+ new IndexDropOperatorDescriptor(spec, primaryHelperFactory, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryDropOp, NC1_ID);
spec.addRoot(primaryDropOp);
runTest(spec);
@@ -281,7 +291,7 @@
protected void destroySecondaryIndex() throws Exception {
JobSpecification spec = new JobSpecification();
IndexDropOperatorDescriptor secondaryDropOp =
- new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, getPartitionsMap(1));
+ new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryDropOp, NC1_ID);
spec.addRoot(secondaryDropOp);
runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 3b16bb8..d5ad7da 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -24,9 +24,11 @@
import java.io.DataOutput;
import java.io.File;
+import java.util.Arrays;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
@@ -77,9 +79,9 @@
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.test.support.TestStorageManager;
import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
-import org.apache.hyracks.tests.integration.TestUtil;
import org.junit.After;
import org.junit.Before;
@@ -107,6 +109,8 @@
protected final int primaryKeyFieldCount = 1;
protected final IBinaryComparatorFactory[] primaryComparatorFactories =
new IBinaryComparatorFactory[primaryKeyFieldCount];
+ protected final IBinaryHashFunctionFactory[] primaryHashFactories =
+ new IBinaryHashFunctionFactory[primaryKeyFieldCount];
protected final RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
@@ -178,6 +182,7 @@
primaryTypeTraits[8] = UTF8StringPointable.TYPE_TRAITS;
primaryTypeTraits[9] = UTF8StringPointable.TYPE_TRAITS;
primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+ primaryHashFactories[0] = primaryHashFunFactories[0];
// field, type and key declarations for secondary indexes
secondaryTypeTraits[0] = DoublePointable.TYPE_TRAITS;
@@ -238,7 +243,7 @@
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, primarySplitProvider, btreeFactory, false);
IndexCreateOperatorDescriptor primaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
spec.addRoot(primaryCreateOp);
runTest(spec);
@@ -275,8 +280,12 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
- TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- primaryRecDesc, fieldPermutation, 0.7f, false, 1000L, true, primaryHelperFactory);
+ int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+ ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
+ primaryHashFunFactories, ordersSplits.length);
+ TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad =
+ new TreeIndexBulkLoadOperatorDescriptor(spec, primaryRecDesc, fieldPermutation, 0.7f, false, 1000L,
+ true, primaryHelperFactory, null, tuplePartitionerFactory, partitionsMap);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBulkLoad, NC1_ID);
NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
@@ -297,7 +306,7 @@
IndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, secondarySplitProvider, rtreeFactory, false);
IndexCreateOperatorDescriptor secondaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
spec.addRoot(secondaryCreateOp);
runTest(spec);
@@ -333,8 +342,14 @@
// load secondary index
int[] fieldPermutation = { 6, 7, 8, 9, 0 };
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- secondaryRecDesc, fieldPermutation, 0.7f, false, 1000L, true, secondaryHelperFactory);
+ int[] pkFields = { 4 };
+ int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+ int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+ ITuplePartitionerFactory partitionerFactory =
+ new FieldHashPartitionerFactory(pkFields, primaryHashFactories, numPartitions);
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad =
+ new TreeIndexBulkLoadOperatorDescriptor(spec, secondaryRecDesc, fieldPermutation, 0.7f, false, 1000L,
+ true, secondaryHelperFactory, null, partitionerFactory, partitionsMap);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBulkLoad, NC1_ID);
NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
@@ -375,7 +390,7 @@
ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
- int[][] partitionsMap = TestUtil.getPartitionsMap(ordersSplits.length);
+ int[][] partitionsMap = TestUtils.getPartitionsMap(ordersSplits.length);
// insert into primary index
int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
@@ -412,7 +427,7 @@
protected void destroyPrimaryIndex() throws Exception {
JobSpecification spec = new JobSpecification();
IndexDropOperatorDescriptor primaryDropOp =
- new IndexDropOperatorDescriptor(spec, primaryHelperFactory, getPartitionsMap(1));
+ new IndexDropOperatorDescriptor(spec, primaryHelperFactory, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryDropOp, NC1_ID);
spec.addRoot(primaryDropOp);
runTest(spec);
@@ -421,7 +436,7 @@
protected void destroySecondaryIndex() throws Exception {
JobSpecification spec = new JobSpecification();
IndexDropOperatorDescriptor secondaryDropOp =
- new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, getPartitionsMap(1));
+ new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryDropOp, NC1_ID);
spec.addRoot(secondaryDropOp);
runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index ee56375..6f59ed8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -236,12 +236,4 @@
outputFiles.add(fileRef.getFile());
return new ManagedFileSplit(ncs.getId(), fileName);
}
-
- public static int[][] getPartitionsMap(int numPartitions) {
- int[][] map = new int[numPartitions][1];
- for (int i = 0; i < numPartitions; i++) {
- map[i] = new int[] { i };
- }
- return map;
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
index 8f3181d..73d985d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
@@ -68,12 +68,4 @@
static ObjectNode httpGetAsObject(URI uri) throws URISyntaxException, IOException {
return getResultAsJson(httpGetAsString(uri));
}
-
- public static int[][] getPartitionsMap(int numPartitions) {
- int[][] map = new int[numPartitions][1];
- for (int i = 0; i < numPartitions; i++) {
- map[i] = new int[] { i };
- }
- return map;
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 12b6ae6..f401e45 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -21,6 +21,8 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -31,32 +33,49 @@
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.api.ITupleFilter;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.buffercache.NoOpPageWriteCallback;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
protected final IHyracksTaskContext ctx;
protected final float fillFactor;
protected final boolean verifyInput;
protected final long numElementsHint;
protected final boolean checkIfEmptyIndex;
- protected final IIndexDataflowHelper indexHelper;
+ protected final IIndexDataflowHelper[] indexHelpers;
protected final RecordDescriptor recDesc;
protected final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
protected final ITupleFilterFactory tupleFilterFactory;
+ protected final ITuplePartitioner tuplePartitioner;
+ protected final int[] partitions;
+ protected final Int2IntMap storagePartitionId2Index;
protected FrameTupleAccessor accessor;
- protected IIndex index;
- protected IIndexBulkLoader bulkLoader;
+ protected final IIndex[] indexes;
+ protected final IIndexBulkLoader[] bulkLoaders;
protected ITupleFilter tupleFilter;
protected FrameTupleReference frameTuple;
- public IndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory,
- IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput,
- long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor recDesc,
- ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
+ public IndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, IHyracksTaskContext ctx,
+ int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, RecordDescriptor recDesc, ITupleFilterFactory tupleFilterFactory,
+ ITuplePartitionerFactory partitionerFactory, int[][] partitionsMap) throws HyracksDataException {
this.ctx = ctx;
- this.indexHelper = indexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+ this.partitions = partitionsMap[partition];
+ this.tuplePartitioner = partitionerFactory.createPartitioner(ctx);
+ this.storagePartitionId2Index = new Int2IntOpenHashMap();
+ this.indexes = new IIndex[partitions.length];
+ this.indexHelpers = new IIndexDataflowHelper[partitions.length];
+ this.bulkLoaders = new IIndexBulkLoader[partitions.length];
+ for (int i = 0; i < partitions.length; i++) {
+ storagePartitionId2Index.put(partitions[i], i);
+ indexHelpers[i] = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partitions[i]);
+ }
this.fillFactor = fillFactor;
this.verifyInput = verifyInput;
this.numElementsHint = numElementsHint;
@@ -69,15 +88,18 @@
@Override
public void open() throws HyracksDataException {
accessor = new FrameTupleAccessor(recDesc);
- indexHelper.open();
- index = indexHelper.getIndexInstance();
+ for (int i = 0; i < indexHelpers.length; i++) {
+ indexHelpers[i].open();
+ indexes[i] = indexHelpers[i].getIndexInstance();
+ initializeBulkLoader(indexes[i], i);
+ }
+
try {
writer.open();
if (tupleFilterFactory != null) {
tupleFilter = tupleFilterFactory.createTupleFilter(ctx);
frameTuple = new FrameTupleReference();
}
- initializeBulkLoader();
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -94,8 +116,10 @@
continue;
}
}
+ int storagePartition = tuplePartitioner.partition(accessor, i);
+ int storageIdx = storagePartitionId2Index.get(storagePartition);
tuple.reset(accessor, i);
- bulkLoader.add(tuple);
+ bulkLoaders[storageIdx].add(tuple);
}
FrameUtils.flushFrame(buffer, writer);
@@ -104,20 +128,14 @@
@Override
public void close() throws HyracksDataException {
try {
- // bulkloader can be null if an exception is thrown before it is initialized.
- if (bulkLoader != null) {
- bulkLoader.end();
- }
+ closeBulkLoaders();
} catch (Throwable th) {
throw HyracksDataException.create(th);
} finally {
- if (index != null) {
- // If index was opened!
- try {
- indexHelper.close();
- } finally {
- writer.close();
- }
+ try {
+ closeIndexes(indexes, indexHelpers);
+ } finally {
+ writer.close();
}
}
}
@@ -129,13 +147,33 @@
@Override
public void fail() throws HyracksDataException {
- if (index != null) {
- writer.fail();
+ writer.fail();
+ }
+
+ protected void initializeBulkLoader(IIndex index, int indexId) throws HyracksDataException {
+ bulkLoaders[indexId] = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ NoOpPageWriteCallback.INSTANCE);
+ }
+
+ private void closeBulkLoaders() throws HyracksDataException {
+ for (IIndexBulkLoader bulkLoader : bulkLoaders) {
+ // bulkloader can be null if an exception is thrown before it is initialized.
+ if (bulkLoader != null) {
+ bulkLoader.end();
+ }
}
}
- protected void initializeBulkLoader() throws HyracksDataException {
- bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
- NoOpPageWriteCallback.INSTANCE);
+ protected static void closeIndexes(IIndex[] indexes, IIndexDataflowHelper[] indexHelpers)
+ throws HyracksDataException {
+ Throwable failure = null;
+ for (int i = 0; i < indexes.length; i++) {
+ if (indexes[i] != null) {
+ failure = ResourceReleaseUtils.close(indexHelpers[i], failure);
+ }
+ }
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
index 8346f62..351fbf6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -30,27 +31,23 @@
public class TreeIndexBulkLoadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
protected final int[] fieldPermutation;
protected final float fillFactor;
protected final boolean verifyInput;
protected final long numElementsHint;
protected final boolean checkIfEmptyIndex;
+ protected final int[][] partitionsMap;
+ protected final ITuplePartitionerFactory partitionerFactory;
protected final IIndexDataflowHelperFactory indexHelperFactory;
- private final ITupleFilterFactory tupleFilterFactory;
-
- public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
- int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
- boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory) {
- this(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
- indexHelperFactory, null);
- }
+ protected final ITupleFilterFactory tupleFilterFactory;
public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory,
- ITupleFilterFactory tupleFilterFactory) {
+ ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory partitionerFactory,
+ int[][] partitionsMap) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.fieldPermutation = fieldPermutation;
@@ -60,6 +57,8 @@
this.checkIfEmptyIndex = checkIfEmptyIndex;
this.outRecDescs[0] = outRecDesc;
this.tupleFilterFactory = tupleFilterFactory;
+ this.partitionerFactory = partitionerFactory;
+ this.partitionsMap = partitionsMap;
}
@Override
@@ -67,6 +66,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new IndexBulkLoadOperatorNodePushable(indexHelperFactory, ctx, partition, fieldPermutation, fillFactor,
verifyInput, numElementsHint, checkIfEmptyIndex,
- recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), tupleFilterFactory);
+ recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), tupleFilterFactory,
+ partitionerFactory, partitionsMap);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index 2348a1a..8ac5dfc 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -190,4 +190,12 @@
context.updateLoggers();
}
}
+
+ public static int[][] getPartitionsMap(int numPartitions) {
+ int[][] map = new int[numPartitions][1];
+ for (int i = 0; i < numPartitions; i++) {
+ map[i] = new int[] { i };
+ }
+ return map;
+ }
}