[ASTERIXDB-3144][RT] Introduce DataPartitioningProvider
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add storage partitioning scheme config (dyanmic or static) and
default it to dynamic.
- Introduce DataPartitioningProvider which encapsulates the logic
for dataset partitioning based on the partitioning scheme.
Change-Id: Ia2bbc716fb4c2e9abca06e8f8629b15bd48bc7f3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17503
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index 8865bb2..6c6b2aa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -241,10 +242,9 @@
propsLocal.add(new LocalOrderProperty(orderColumns));
MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
Dataset dataset = mp.findDataset(searchIndex.getDataverseName(), searchIndex.getDatasetName());
- int[][] partitionsMap = mp.getPartitionsMap(dataset);
- pv[0] = new StructuralPropertiesVector(
- UnorderedPartitionedProperty.ofPartitionsMap(searchKeyVars, domain, partitionsMap),
- propsLocal);
+ PartitioningProperties partitioningProperties = mp.getPartitioningProperties(dataset);
+ pv[0] = new StructuralPropertiesVector(UnorderedPartitionedProperty.ofPartitionsMap(searchKeyVars,
+ domain, partitioningProperties.getComputeStorageMap()), propsLocal);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 5bdb2db..20334bf 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -20,6 +20,7 @@
import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.OptimizationConfUtil;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.declared.DataSourceId;
@@ -58,7 +59,6 @@
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -171,8 +171,7 @@
}
IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap);
RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset, indexName);
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset, indexName);
// TODO: Here we assume there is only one search key field.
int queryField = keyFields[0];
// Get tokenizer and search modifier factories.
@@ -183,12 +182,9 @@
IFullTextConfigEvaluatorFactory fullTextConfigEvaluatorFactory =
FullTextUtil.fetchFilterAndCreateConfigEvaluator(metadataProvider, secondaryIndex.getDataverseName(),
((Index.TextIndexDetails) secondaryIndex.getIndexDetails()).getFullTextConfigName());
- IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
- metadataProvider.getStorageComponentProvider().getStorageManager(), secondarySplitsAndConstraint.first);
-
- int numPartitions = MetadataProvider.getNumPartitions(secondarySplitsAndConstraint.second);
- int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
-
+ IIndexDataflowHelperFactory dataflowHelperFactory =
+ new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+ partitioningProperties.getSpiltsProvider());
LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp =
new LSMInvertedIndexSearchOperatorDescriptor(jobSpec, outputRecDesc, queryField, dataflowHelperFactory,
queryTokenizerFactory, fullTextConfigEvaluatorFactory, searchModifierFactory, retainInput,
@@ -196,7 +192,8 @@
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
IndexOperation.SEARCH, null),
minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys,
- propagateIndexFilter, nonFilterWriterFactory, frameLimit, partitionsMap);
- return new Pair<>(invIndexSearchOp, secondarySplitsAndConstraint.second);
+ propagateIndexFilter, nonFilterWriterFactory, frameLimit,
+ partitioningProperties.getComputeStorageMap());
+ return new Pair<>(invIndexSearchOp, partitioningProperties.getConstraints());
}
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
index 24a7fb2..7b8567f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
@@ -26,6 +26,7 @@
import java.util.TreeMap;
import org.apache.asterix.algebra.operators.CommitOperator;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -361,9 +362,9 @@
outputVars.add(outputVar);
VariableUtilities.substituteVariables(lop, inputVar, outputVar, context);
}
-
- int[][] partitionsMap = metadataProvider.getPartitionsMap(idx);
- IntersectOperator intersect = new IntersectOperator(outputVars, inputVars, partitionsMap);
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(idx);
+ IntersectOperator intersect =
+ new IntersectOperator(outputVars, inputVars, partitioningProperties.getComputeStorageMap());
intersect.setSourceLocation(lop.getSourceLocation());
for (ILogicalOperator secondarySearch : subRoots) {
intersect.getInputs().add(secondarySearch.getInputs().get(0));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index 880880e..a2d99a0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -47,6 +47,7 @@
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
import org.apache.asterix.common.external.IAdapterFactoryService;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.metadata.IMetadataLockUtil;
@@ -54,6 +55,7 @@
import org.apache.asterix.common.storage.ICompressionManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.metadata.utils.DataPartitioningProvider;
import org.apache.asterix.runtime.compression.CompressionManager;
import org.apache.asterix.runtime.job.listener.NodeJobTracker;
import org.apache.asterix.runtime.transaction.ResourceIdManager;
@@ -112,6 +114,7 @@
private final IConfigValidator configValidator;
private final IAdapterFactoryService adapterFactoryService;
private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
+ private final IDataPartitioningProvider dataPartitioningProvider;
public CcApplicationContext(ICCServiceContext ccServiceCtx, HyracksConnection hcc,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
@@ -154,6 +157,7 @@
requestTracker = new RequestTracker(this);
configValidator = configValidatorFactory.create();
this.adapterFactoryService = adapterFactoryService;
+ dataPartitioningProvider = new DataPartitioningProvider(this);
}
@Override
@@ -357,4 +361,9 @@
public ReentrantReadWriteLock getCompilationLock() {
return compilationLock;
}
+
+ @Override
+ public IDataPartitioningProvider getDataPartitioningProvider() {
+ return dataPartitioningProvider;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
index f470949..d5743cb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
@@ -18,15 +18,13 @@
*/
package org.apache.asterix.utils;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.runtime.utils.RuntimeUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
public class DataverseUtil {
@@ -35,10 +33,11 @@
public static JobSpecification dropDataverseJobSpec(Dataverse dataverse, MetadataProvider metadata) {
JobSpecification jobSpec = RuntimeUtils.createJobSpecification(metadata.getApplicationContext());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadata.splitAndConstraints(dataverse.getDataverseName());
- FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first, false);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second);
+ PartitioningProperties partitioningProperties = metadata.splitAndConstraints(dataverse.getDataverseName());
+ FileRemoveOperatorDescriptor frod =
+ new FileRemoveOperatorDescriptor(jobSpec, partitioningProperties.getSpiltsProvider(), false);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod,
+ partitioningProperties.getConstraints());
jobSpec.addRoot(frod);
return jobSpec;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index ff99327..f012a4e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -19,6 +19,7 @@
package org.apache.asterix.utils;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.transactions.TxnId;
@@ -29,7 +30,6 @@
import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
@@ -38,7 +38,6 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
public class FlushDatasetUtil {
private FlushDatasetUtil() {
@@ -66,9 +65,9 @@
spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset, dataset.getDatasetName());
- AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
+ PartitioningProperties partitioningProperties =
+ metadataProvider.getPartitioningProperties(dataset, dataset.getDatasetName());
+ AlgebricksPartitionConstraint primaryPartitionConstraint = partitioningProperties.getConstraints();
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
primaryPartitionConstraint);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 3ddbfd9..682b636 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -214,7 +214,7 @@
fieldPermutation[i] = i;
}
int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
- int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+ int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
@@ -263,7 +263,7 @@
}
int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
- int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+ int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
@@ -372,7 +372,7 @@
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
- int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+ int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
@@ -838,7 +838,7 @@
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
- int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+ int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory =
new FieldHashPartitionerFactory(primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
@@ -912,4 +912,12 @@
}
return new RecordDescriptor(outputSerDes, outputTypeTraits);
}
+
+ private static int[][] getPartitionsMap(int numPartitions) {
+ int[][] map = new int[numPartitions][1];
+ for (int i = 0; i < numPartitions; i++) {
+ map[i] = new int[] { i };
+ }
+ return map;
+ }
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 4e96365..801cfb0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -54,6 +54,7 @@
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
+ "storage.partitioning" : "dynamic",
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 415e96d..e54bd70 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -54,6 +54,7 @@
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
+ "storage.partitioning" : "dynamic",
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index add09ca..c5496b2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -54,6 +54,7 @@
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
+ "storage.partitioning" : "dynamic",
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/PartitioningProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/PartitioningProperties.java
new file mode 100644
index 0000000..1580ca4
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/PartitioningProperties.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class PartitioningProperties {
+ private final IFileSplitProvider splitsProvider;
+ private final AlgebricksPartitionConstraint constraints;
+ private final int[][] computeStorageMap;
+
+ private PartitioningProperties(IFileSplitProvider splitsProvider, AlgebricksPartitionConstraint constraints,
+ int[][] computeStorageMap) {
+ this.splitsProvider = splitsProvider;
+ this.constraints = constraints;
+ this.computeStorageMap = computeStorageMap;
+ }
+
+ public static PartitioningProperties of(IFileSplitProvider splitsProvider,
+ AlgebricksPartitionConstraint constraints, int[][] computeStorageMap) {
+ return new PartitioningProperties(splitsProvider, constraints, computeStorageMap);
+ }
+
+ public IFileSplitProvider getSpiltsProvider() {
+ return splitsProvider;
+ }
+
+ public AlgebricksPartitionConstraint getConstraints() {
+ return constraints;
+ }
+
+ public int[][] getComputeStorageMap() {
+ return computeStorageMap;
+ }
+
+ public int getNumberOfPartitions() {
+ return splitsProvider.getFileSplits().length;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 073da97..40bcfb0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -32,6 +32,7 @@
import java.util.function.Function;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.utils.PartitioningScheme;
import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
@@ -64,7 +65,8 @@
STORAGE_GLOBAL_CLEANUP_TIMEOUT(POSITIVE_INTEGER, (int) TimeUnit.MINUTES.toSeconds(10)),
STORAGE_COLUMN_MAX_TUPLE_COUNT(NONNEGATIVE_INTEGER, 15000),
STORAGE_COLUMN_FREE_SPACE_TOLERANCE(DOUBLE, 0.15),
- STORAGE_FORMAT(STRING, "row");
+ STORAGE_FORMAT(STRING, "row"),
+ STORAGE_PARTITIONING(STRING, "dynamic");
private final IOptionType interpreter;
private final Object defaultValue;
@@ -81,6 +83,7 @@
case STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE:
case STORAGE_GLOBAL_CLEANUP:
case STORAGE_GLOBAL_CLEANUP_TIMEOUT:
+ case STORAGE_PARTITIONING:
return Section.COMMON;
default:
return Section.NC;
@@ -139,6 +142,9 @@
+ " 0.15 means a physical page with 15% or less empty space is tolerable)";
case STORAGE_FORMAT:
return "The default storage format (either row or column)";
+ case STORAGE_PARTITIONING:
+ return "The storage partitioning scheme (either dynamic or static). This value should not be changed"
+ + " after any dataset have been created";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -287,4 +293,8 @@
public String getStorageFormat() {
return accessor.getString(Option.STORAGE_FORMAT);
}
+
+ public PartitioningScheme getPartitioningScheme() {
+ return PartitioningScheme.fromName(accessor.getString(Option.STORAGE_PARTITIONING));
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index c22ac4c..281b069 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -158,4 +158,11 @@
* @return the cluster query compilation lock
*/
ReentrantReadWriteLock getCompilationLock();
+
+ /**
+ * Gets the data partitioing provider
+ *
+ * @return the data partitioing provider
+ */
+ IDataPartitioningProvider getDataPartitioningProvider();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
new file mode 100644
index 0000000..e59d4e7
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.dataflow;
+
+public interface IDataPartitioningProvider {
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/PartitioningScheme.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/PartitioningScheme.java
new file mode 100644
index 0000000..bdfa7c5
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/PartitioningScheme.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum PartitioningScheme {
+ DYNAMIC("dynamic"),
+ STATIC("static");
+
+ private static final Map<String, PartitioningScheme> partitioningSchemes =
+ Collections.unmodifiableMap(Arrays.stream(PartitioningScheme.values())
+ .collect(Collectors.toMap(PartitioningScheme::getStr, Function.identity())));
+
+ private final String str;
+
+ PartitioningScheme(String str) {
+ this.str = str;
+ }
+
+ public String getStr() {
+ return str;
+ }
+
+ public static PartitioningScheme fromName(String name) {
+ PartitioningScheme partitioningScheme = partitioningSchemes.get(name.toLowerCase());
+ if (partitioningScheme == null) {
+ throw new IllegalArgumentException("unknonw partitioning scheme: " + name);
+ }
+ return partitioningScheme;
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
index f51e474..3f3482a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Set;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.ListSet;
@@ -65,8 +66,10 @@
String dsName = ((FeedDataSource) ds).getTargetDataset();
Dataset feedDs = ((MetadataProvider) ctx.getMetadataProvider())
.findDataset(ds.getId().getDataverseName(), dsName);
- int[][] partitionsMap1 = ((MetadataProvider) ctx.getMetadataProvider()).getPartitionsMap(feedDs);
- pp = getFeedDatasetPartitioningProperty(ds, domain, scanVariables, partitionsMap1);
+ PartitioningProperties partitioningProperties =
+ ((MetadataProvider) ctx.getMetadataProvider()).getPartitioningProperties(feedDs);
+ pp = getFeedDatasetPartitioningProperty(ds, domain, scanVariables,
+ partitioningProperties.getComputeStorageMap());
break;
case DataSource.Type.INTERNAL_DATASET:
case DataSource.Type.SAMPLE:
@@ -77,8 +80,9 @@
} else {
dataset = ((SampleDataSource) ds).getDataset();
}
- int[][] partitionsMap = ((MetadataProvider) ctx.getMetadataProvider()).getPartitionsMap(dataset);
- pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars, partitionsMap);
+ int[][] computeStorageMap = ((MetadataProvider) ctx.getMetadataProvider())
+ .getPartitioningProperties(dataset).getComputeStorageMap();
+ pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars, computeStorageMap);
propsLocal.add(new LocalOrderProperty(getOrderColumns(pvars)));
break;
default:
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 97d5c39..c648b33 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,7 +26,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -35,6 +34,7 @@
import java.util.stream.Collectors;
import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.config.StorageProperties;
@@ -84,6 +84,7 @@
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.Synonym;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
+import org.apache.asterix.metadata.utils.DataPartitioningProvider;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.metadata.utils.FullTextUtil;
import org.apache.asterix.metadata.utils.IndexUtil;
@@ -106,7 +107,6 @@
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -191,6 +191,8 @@
private TxnId txnId;
private boolean blockingOperatorDisabled = false;
+ private final DataPartitioningProvider dataPartitioningProvider;
+
public static MetadataProvider create(ICcApplicationContext appCtx, Dataverse defaultDataverse) {
java.util.function.Function<ICcApplicationContext, IMetadataProvider<?, ?>> factory =
((ICCExtensionManager) appCtx.getExtensionManager()).getMetadataProviderFactory();
@@ -204,6 +206,7 @@
this.storageComponentProvider = appCtx.getStorageComponentProvider();
storageProperties = appCtx.getStorageProperties();
functionManager = ((IFunctionExtensionManager) appCtx.getExtensionManager()).getFunctionManager();
+ dataPartitioningProvider = (DataPartitioningProvider) appCtx.getDataPartitioningProvider();
locks = new LockList();
config = new HashMap<>();
}
@@ -575,8 +578,7 @@
int numPrimaryKeys = dataset.getPrimaryKeys().size();
RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
- getSplitProviderAndConstraints(dataset, theIndex.getIndexName());
+ PartitioningProperties datasetPartitioningProp = getPartitioningProperties(dataset, theIndex.getIndexName());
int[] primaryKeyFields = new int[numPrimaryKeys];
for (int i = 0; i < numPrimaryKeys; i++) {
primaryKeyFields[i] = i;
@@ -600,15 +602,16 @@
dataset.getSearchCallbackFactory(storageComponentProvider, theIndex, IndexOperation.SEARCH,
primaryKeyFields, primaryKeyFieldsInSecondaryIndex, proceedIndexOnlyPlan);
IStorageManager storageManager = getStorageComponentProvider().getStorageManager();
- IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first);
+ IIndexDataflowHelperFactory indexHelperFactory =
+ new IndexDataflowHelperFactory(storageManager, datasetPartitioningProp.getSpiltsProvider());
BTreeSearchOperatorDescriptor btreeSearchOp;
- int numPartitions = getNumPartitions(spPc.second);
- int[][] partitionsMap = getPartitionsMap(numPartitions);
+ int[][] partitionsMap = datasetPartitioningProp.getComputeStorageMap();
ITuplePartitionerFactory tuplePartitionerFactory = null;
if (partitionInputTuples) {
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- tuplePartitionerFactory = new FieldHashPartitionerFactory(lowKeyFields, pkHashFunFactories, numPartitions);
+ tuplePartitionerFactory = new FieldHashPartitionerFactory(lowKeyFields, pkHashFunFactories,
+ datasetPartitioningProp.getNumberOfPartitions());
}
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
@@ -627,7 +630,7 @@
} else {
btreeSearchOp = null;
}
- return new Pair<>(btreeSearchOp, spPc.second);
+ return new Pair<>(btreeSearchOp, datasetPartitioningProp.getConstraints());
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRtreeSearchRuntime(JobSpecification jobSpec,
@@ -645,8 +648,8 @@
}
Index.ValueIndexDetails secondaryIndexDetails = (Index.ValueIndexDetails) secondaryIndex.getIndexDetails();
RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
- getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
+ PartitioningProperties partitioningProperties =
+ getPartitioningProperties(dataset, secondaryIndex.getIndexName());
int[] primaryKeyFields = new int[numPrimaryKeys];
for (int i = 0; i < numPrimaryKeys; i++) {
primaryKeyFields[i] = i;
@@ -677,8 +680,8 @@
dataset.getSearchCallbackFactory(storageComponentProvider, secondaryIndex, IndexOperation.SEARCH,
primaryKeyFields, primaryKeyFieldsInSecondaryIndex, isIndexOnlyPlan);
RTreeSearchOperatorDescriptor rtreeSearchOp;
- IIndexDataflowHelperFactory indexDataflowHelperFactory =
- new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), spPc.first);
+ IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(
+ storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider());
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true,
indexDataflowHelperFactory, retainInput, retainMissing, nonMatchWriterFactory,
@@ -689,7 +692,7 @@
rtreeSearchOp = null;
}
- return new Pair<>(rtreeSearchOp, spPc.second);
+ return new Pair<>(rtreeSearchOp, partitioningProperties.getConstraints());
}
@Override
@@ -755,26 +758,23 @@
fieldPermutation[numKeys + 1] = idx;
}
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataset);
+ PartitioningProperties partitioningProperties = getPartitioningProperties(dataset);
long numElementsHint = getCardinalityPerPartitionHint(dataset);
// TODO
// figure out the right behavior of the bulkload and then give the
// right callback
// (ex. what's the expected behavior when there is an error during
// bulkload?)
- int[][] partitionsMap = getPartitionsMap(dataset);
- int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory partitionerFactory =
- new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
- IIndexDataflowHelperFactory indexHelperFactory =
- new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
- LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad =
- new LSMIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation,
- StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, indexHelperFactory,
- null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory, partitionsMap);
- return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
+ ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+ partitioningProperties.getNumberOfPartitions());
+ IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
+ storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider());
+ LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null,
+ fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
+ indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
+ partitioningProperties.getComputeStorageMap());
+ return new Pair<>(btreeBulkLoad, partitioningProperties.getConstraints());
}
@Override
@@ -971,9 +971,8 @@
numKeyFields / 2);
}
- public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitAndConstraints(DataverseName dataverseName) {
- return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(),
- dataverseName);
+ public PartitioningProperties splitAndConstraints(DataverseName dataverseName) {
+ return dataPartitioningProvider.splitAndConstraints(dataverseName);
}
public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName)
@@ -1079,8 +1078,7 @@
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataset);
+ PartitioningProperties partitioningProperties = getPartitioningProperties(dataset);
// prepare callback
int[] primaryKeyFields = new int[numKeys];
@@ -1089,21 +1087,19 @@
}
IModificationOperationCallbackFactory modificationCallbackFactory = dataset
.getModificationCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
- IIndexDataflowHelperFactory idfh =
- new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
-
- int numPartitions = getNumPartitions(splitsAndConstraint.second);
- int[][] partitionsMap = getPartitionsMap(numPartitions);
+ IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+ partitioningProperties.getSpiltsProvider());
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory partitionerFactory =
- new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+ ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+ partitioningProperties.getNumberOfPartitions());
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null,
- BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory, partitionsMap);
+ BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
+ partitioningProperties.getComputeStorageMap());
} else {
if (indexOp == IndexOperation.INSERT) {
ISearchOperationCallbackFactory searchCallbackFactory = dataset
@@ -1114,21 +1110,22 @@
.filter(Index::isPrimaryKeyIndex).findFirst();
IIndexDataflowHelperFactory pkidfh = null;
if (primaryKeyIndex.isPresent()) {
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primaryKeySplitsAndConstraint =
- getSplitProviderAndConstraints(dataset, primaryKeyIndex.get().getIndexName());
+ PartitioningProperties idxPartitioningProperties =
+ getPartitioningProperties(dataset, primaryKeyIndex.get().getIndexName());
pkidfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
- primaryKeySplitsAndConstraint.first);
+ idxPartitioningProperties.getSpiltsProvider());
}
op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, partitionerFactory,
- partitionsMap);
+ partitioningProperties.getComputeStorageMap());
} else {
op = createLSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
- null, true, modificationCallbackFactory, partitionerFactory, partitionsMap);
+ null, true, modificationCallbackFactory, partitionerFactory,
+ partitioningProperties.getComputeStorageMap());
}
}
- return new Pair<>(op, splitsAndConstraint.second);
+ return new Pair<>(op, partitioningProperties.getConstraints());
}
protected LSMPrimaryInsertOperatorDescriptor createLSMPrimaryInsertOperatorDescriptor(JobSpecification spec,
@@ -1286,36 +1283,36 @@
// Index parameters.
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
+ PartitioningProperties partitioningProperties =
+ getPartitioningProperties(dataset, secondaryIndex.getIndexName());
// prepare callback
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
- storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
-
- int numPartitions = getNumPartitions(splitsAndConstraint.second);
- int[][] partitionsMap = getPartitionsMap(numPartitions);
+ storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider());
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory partitionerFactory =
- new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+ ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+ partitioningProperties.getNumberOfPartitions());
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null,
- BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory, partitionsMap);
+ BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory,
+ partitioningProperties.getComputeStorageMap());
} else if (indexOp == IndexOperation.UPSERT) {
int operationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
filterFactory, prevFilterFactory, modificationCallbackFactory, operationFieldIndex,
- BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory, partitionsMap);
+ BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory,
+ partitioningProperties.getComputeStorageMap());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
- filterFactory, false, modificationCallbackFactory, partitionerFactory, partitionsMap);
+ filterFactory, false, modificationCallbackFactory, partitionerFactory,
+ partitioningProperties.getComputeStorageMap());
}
- return new Pair<>(op, splitsAndConstraint.second);
+ return new Pair<>(op, partitioningProperties.getConstraints());
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -1355,19 +1352,17 @@
// Index parameters.
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
+
+ PartitioningProperties partitioningProperties =
+ getPartitioningProperties(dataset, secondaryIndex.getIndexName());
// Prepare callback.
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
- storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
-
- int numPartitions = getNumPartitions(splitsAndConstraint.second);
- int[][] partitionsMap = getPartitionsMap(numPartitions);
+ storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider());
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory tuplePartitionerFactory =
- new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+ ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(pkFields,
+ pkHashFunFactories, partitioningProperties.getNumberOfPartitions());
IOperatorDescriptor op;
if (indexOp == IndexOperation.UPSERT) {
@@ -1375,13 +1370,13 @@
op = new LSMSecondaryUpsertWithNestedPlanOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
idfh, modificationCallbackFactory, operationFieldIndex, BinaryIntegerInspector.FACTORY,
secondaryKeysPipelines.get(0), secondaryKeysPipelines.get(1), tuplePartitionerFactory,
- partitionsMap);
+ partitioningProperties.getComputeStorageMap());
} else {
op = new LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor(spec, inputRecordDesc,
fieldPermutation, indexOp, idfh, modificationCallbackFactory, secondaryKeysPipelines.get(0),
- tuplePartitionerFactory, partitionsMap);
+ tuplePartitionerFactory, partitioningProperties.getComputeStorageMap());
}
- return new Pair<>(op, splitsAndConstraint.second);
+ return new Pair<>(op, partitioningProperties.getConstraints());
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -1462,20 +1457,18 @@
prevFieldPermutation[numKeys] = idx;
}
}
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
+
+ PartitioningProperties partitioningProperties =
+ getPartitioningProperties(dataset, secondaryIndex.getIndexName());
// prepare callback
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
- IIndexDataflowHelperFactory indexDataflowHelperFactory =
- new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
-
- int numPartitions = getNumPartitions(splitsAndConstraint.second);
- int[][] partitionsMap = getPartitionsMap(numPartitions);
+ IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(
+ storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider());
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory partitionerFactory =
- new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+ ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+ partitioningProperties.getNumberOfPartitions());
IOperatorDescriptor op;
if (bulkload) {
@@ -1483,19 +1476,19 @@
op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false,
indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory,
- partitionerFactory, partitionsMap);
+ partitionerFactory, partitioningProperties.getComputeStorageMap());
} else if (indexOp == IndexOperation.UPSERT) {
int operationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
indexDataflowHelperFactory, filterFactory, prevFilterFactory, modificationCallbackFactory,
operationFieldIndex, BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory,
- partitionsMap);
+ partitioningProperties.getComputeStorageMap());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory, partitionerFactory,
- partitionsMap);
+ partitioningProperties.getComputeStorageMap());
}
- return new Pair<>(op, splitsAndConstraint.second);
+ return new Pair<>(op, partitioningProperties.getConstraints());
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexModificationRuntime(
@@ -1583,20 +1576,18 @@
// Index parameters.
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
+
+ PartitioningProperties partitioningProperties =
+ getPartitioningProperties(dataset, secondaryIndex.getIndexName());
// prepare callback
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
IIndexDataflowHelperFactory indexDataFlowFactory = new IndexDataflowHelperFactory(
- storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
-
- int numPartitions = getNumPartitions(splitsAndConstraint.second);
- int[][] partitionsMap = getPartitionsMap(numPartitions);
+ storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider());
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory partitionerFactory =
- new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+ ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+ partitioningProperties.getNumberOfPartitions());
IOperatorDescriptor op;
if (bulkload) {
@@ -1604,18 +1595,19 @@
op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory,
null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory,
- partitionsMap);
+ partitioningProperties.getComputeStorageMap());
} else if (indexOp == IndexOperation.UPSERT) {
int upsertOperationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
filterFactory, prevFilterFactory, modificationCallbackFactory, upsertOperationFieldIndex,
- BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory, partitionsMap);
+ BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory,
+ partitioningProperties.getComputeStorageMap());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
indexDataFlowFactory, filterFactory, false, modificationCallbackFactory, partitionerFactory,
- partitionsMap);
+ partitioningProperties.getComputeStorageMap());
}
- return new Pair<>(op, splitsAndConstraint.second);
+ return new Pair<>(op, partitioningProperties.getConstraints());
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -1757,8 +1749,8 @@
FullTextUtil.fetchFilterAndCreateConfigEvaluator(this, secondaryIndex.getDataverseName(),
secondaryIndexDetails.getFullTextConfigName());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
+ PartitioningProperties partitioningProperties =
+ getPartitioningProperties(dataset, secondaryIndex.getIndexName());
// Generate Output Record format
ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
@@ -1800,7 +1792,7 @@
tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory,
fullTextConfigEvaluatorFactory, docField, keyFields, isPartitioned, true, false,
MissingWriterFactory.INSTANCE);
- return new Pair<>(tokenizerOp, splitsAndConstraint.second);
+ return new Pair<>(tokenizerOp, partitioningProperties.getConstraints());
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -1830,15 +1822,19 @@
return storageComponentProvider;
}
- public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds)
- throws AlgebricksException {
- return getSplitProviderAndConstraints(ds, ds.getDatasetName());
+ public PartitioningProperties getPartitioningProperties(Index idx) throws AlgebricksException {
+ Dataset ds = findDataset(idx.getDataverseName(), idx.getDatasetName());
+ return getPartitioningProperties(ds, idx.getIndexName());
}
- public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds,
- String indexName) throws AlgebricksException {
- FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName);
- return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+ public PartitioningProperties getPartitioningProperties(Dataset ds) throws AlgebricksException {
+ return getPartitioningProperties(ds, ds.getDatasetName());
+ }
+
+ public PartitioningProperties getPartitioningProperties(Dataset ds, String indexName) throws AlgebricksException {
+ //TODO(partitioning) pass splits rather than mdTxnCtx?
+ // FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName);
+ return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, ds, indexName);
}
public List<Pair<IFileSplitProvider, String>> getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException {
@@ -1898,34 +1894,6 @@
validateDatabaseObjectNameImpl(objectName, sourceLoc);
}
- public int[][] getPartitionsMap(Dataset dataset) throws AlgebricksException {
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = getSplitProviderAndConstraints(dataset);
- return getPartitionsMap(getNumPartitions(spPc.second));
- }
-
- public int[][] getPartitionsMap(Index idx) throws AlgebricksException {
- Dataset ds = findDataset(idx.getDataverseName(), idx.getDatasetName());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
- getSplitProviderAndConstraints(ds, idx.getIndexName());
- return getPartitionsMap(getNumPartitions(spPc.second));
- }
-
- public static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
- if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
- return ((AlgebricksCountPartitionConstraint) constraint).getCount();
- } else {
- return ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations().length;
- }
- }
-
- public static int[][] getPartitionsMap(int numPartitions) {
- int[][] map = new int[numPartitions][1];
- for (int i = 0; i < numPartitions; i++) {
- map[i] = new int[] { i };
- }
- return map;
- }
-
private void validateDatabaseObjectNameImpl(String name, SourceLocation sourceLoc) throws AlgebricksException {
if (name == null || name.isEmpty()) {
throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, sourceLoc, "");
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
new file mode 100644
index 0000000..f5e96b1
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC;
+import static org.apache.asterix.common.utils.PartitioningScheme.STATIC;
+
+import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class DataPartitioningProvider implements IDataPartitioningProvider {
+
+ private final ICcApplicationContext appCtx;
+ private final PartitioningScheme scheme;
+
+ public DataPartitioningProvider(ICcApplicationContext appCtx) {
+ this.appCtx = appCtx;
+ scheme = appCtx.getStorageProperties().getPartitioningScheme();
+ }
+
+ public PartitioningProperties splitAndConstraints(DataverseName dataverseName) {
+ if (scheme == DYNAMIC) {
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = SplitsAndConstraintsUtil
+ .getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), dataverseName);
+ int[][] partitionsMap = getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
+ return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap);
+ } else if (scheme == STATIC) {
+ throw new NotImplementedException();
+ }
+ throw new IllegalStateException();
+ }
+
+ public PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
+ String indexName) throws AlgebricksException {
+ if (scheme == DYNAMIC) {
+ FileSplit[] splits =
+ SplitsAndConstraintsUtil.getIndexSplits(ds, indexName, mdTxnCtx, appCtx.getClusterStateManager());
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints =
+ StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+ int[][] partitionsMap = getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
+ return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap);
+ } else if (scheme == STATIC) {
+ throw new NotImplementedException();
+ }
+ throw new IllegalStateException();
+ }
+
+ private static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
+ if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
+ return ((AlgebricksCountPartitionConstraint) constraint).getCount();
+ } else {
+ return ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations().length;
+ }
+ }
+
+ private static int[][] getPartitionsMap(int numPartitions) {
+ int[][] map = new int[numPartitions][1];
+ for (int i = 0; i < numPartitions; i++) {
+ map[i] = new int[] { i };
+ }
+ return map;
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 89dea40..27e06eb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -31,6 +31,7 @@
import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.column.util.ColumnSecondaryIndexSchemaUtil;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
@@ -309,15 +310,14 @@
return RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
}
JobSpecification specPrimary = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset);
- IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
- metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
- int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
- IndexDropOperatorDescriptor primaryBtreeDrop =
- new IndexDropOperatorDescriptor(specPrimary, indexHelperFactory, options, partitionsMap);
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
+ IIndexDataflowHelperFactory indexHelperFactory =
+ new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+ partitioningProperties.getSpiltsProvider());
+ IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary, indexHelperFactory,
+ options, partitioningProperties.getComputeStorageMap());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
- splitsAndConstraint.second);
+ partitioningProperties.getConstraints());
specPrimary.addRoot(primaryBtreeDrop);
return specPrimary;
}
@@ -334,14 +334,13 @@
itemType = (ARecordType) metadataProvider.findTypeForDatasetWithoutType(itemType, metaItemType, dataset);
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset);
- FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
+ FileSplit[] fs = partitioningProperties.getSpiltsProvider().getFileSplits();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < fs.length; i++) {
sb.append(fs[i] + " ");
}
- LOGGER.info("CREATING File Splits: " + sb);
+ LOGGER.info("CREATING File Splits: {}", sb);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
// prepare a LocalResourceMetadata which will be stored in NC's local resource
@@ -350,12 +349,11 @@
compactionInfo.first, compactionInfo.second);
IndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
- splitsAndConstraint.first, resourceFactory, true);
- int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
- IndexCreateOperatorDescriptor indexCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap);
+ partitioningProperties.getSpiltsProvider(), resourceFactory, true);
+ IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory,
+ partitioningProperties.getComputeStorageMap());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
- splitsAndConstraint.second);
+ partitioningProperties.getConstraints());
spec.addRoot(indexCreateOp);
return spec;
}
@@ -368,16 +366,16 @@
throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, datasetName, dataverseName);
}
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset);
- IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
- metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
+ IIndexDataflowHelperFactory indexHelperFactory =
+ new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+ partitioningProperties.getSpiltsProvider());
LSMTreeIndexCompactOperatorDescriptor compactOp =
new LSMTreeIndexCompactOperatorDescriptor(spec, indexHelperFactory);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
- splitsAndConstraint.second);
+ partitioningProperties.getConstraints());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
- splitsAndConstraint.second);
+ partitioningProperties.getConstraints());
spec.addRoot(compactOp);
return spec;
}
@@ -398,10 +396,9 @@
public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider,
Dataset dataset, ITupleProjectorFactory projectorFactory) throws AlgebricksException {
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset);
- IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first;
- AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
+ IFileSplitProvider primaryFileSplitProvider = partitioningProperties.getSpiltsProvider();
+ AlgebricksPartitionConstraint primaryPartitionConstraint = partitioningProperties.getConstraints();
// -Infinity
int[] lowKeyFields = null;
// +Infinity
@@ -412,11 +409,10 @@
IRecoveryManager.ResourceType.LSM_BTREE);
IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
- int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec,
dataset.getPrimaryRecordDescriptor(metadataProvider), lowKeyFields, highKeyFields, true, true,
indexHelperFactory, false, false, null, searchCallbackFactory, null, null, false, null, null, -1, false,
- null, null, projectorFactory, null, partitionsMap);
+ null, null, projectorFactory, null, partitioningProperties.getComputeStorageMap());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
primaryPartitionConstraint);
return primarySearchOp;
@@ -445,8 +441,7 @@
Index primaryIndex = metadataProvider.getIndex(dataset.getDataverseName(), dataset.getDatasetName(),
dataset.getDatasetName());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset);
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
// prepare callback
int[] primaryKeyFields = new int[numKeys];
@@ -462,8 +457,8 @@
storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
- IIndexDataflowHelperFactory idfh =
- new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
+ IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+ partitioningProperties.getSpiltsProvider());
LSMPrimaryUpsertOperatorDescriptor op;
ITypeTraits[] outputTypeTraits = new ITypeTraits[inputRecordDesc.getFieldCount() + 1
+ (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
@@ -518,18 +513,15 @@
ARecordType requestedType = getPrevRecordType(metadataProvider, dataset, itemType);
ITupleProjectorFactory projectorFactory = IndexUtil.createUpsertTupleProjectorFactory(
dataset.getDatasetFormatInfo(), requestedType, itemType, metaItemType, numKeys);
-
- int numPartitions = MetadataProvider.getNumPartitions(splitsAndConstraint.second);
- int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider);
- ITuplePartitionerFactory tuplePartitionerFactory =
- new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
-
+ ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+ partitioningProperties.getNumberOfPartitions());
op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh,
missingWriterFactory, modificationCallbackFactory, searchCallbackFactory,
dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, filterSourceIndicator, filterItemType,
- fieldIdx, hasSecondaries, projectorFactory, tuplePartitionerFactory, partitionsMap);
- return new Pair<>(op, splitsAndConstraint.second);
+ fieldIdx, hasSecondaries, projectorFactory, tuplePartitionerFactory,
+ partitioningProperties.getComputeStorageMap());
+ return new Pair<>(op, partitioningProperties.getConstraints());
}
/**
@@ -591,9 +583,8 @@
*/
public static IOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec, Dataset dataset,
MetadataProvider metadataProvider) throws AlgebricksException {
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset);
- AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
+ AlgebricksPartitionConstraint primaryPartitionConstraint = partitioningProperties.getConstraints();
// Build dummy tuple containing one field with a dummy value inside.
ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 43b1fb9..b6f0e96 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -19,11 +19,11 @@
package org.apache.asterix.metadata.utils;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.OptimizationConfUtil;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.utils.StorageConstants;
@@ -114,6 +114,7 @@
private ILSMMergePolicyFactory mergePolicyFactory;
private Map<String, String> mergePolicyProperties;
private int groupbyNumFrames;
+ private int[][] computeStorageMap;
protected SampleOperationsHelper(Dataset dataset, Index index, MetadataProvider metadataProvider,
SourceLocation sourceLoc) {
@@ -134,11 +135,11 @@
comparatorFactories = dataset.getPrimaryComparatorFactories(metadataProvider, itemType, metaType);
groupbyNumFrames = getGroupByNumFrames(metadataProvider, sourceLoc);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
- fileSplitProvider = secondarySplitsAndConstraint.first;
- partitionConstraint = secondarySplitsAndConstraint.second;
-
+ PartitioningProperties partitioningProperties =
+ metadataProvider.getPartitioningProperties(dataset, index.getIndexName());
+ fileSplitProvider = partitioningProperties.getSpiltsProvider();
+ partitionConstraint = partitioningProperties.getConstraints();
+ computeStorageMap = partitioningProperties.getComputeStorageMap();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
mergePolicyFactory = compactionInfo.first;
@@ -153,9 +154,8 @@
mergePolicyFactory, mergePolicyProperties);
IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
fileSplitProvider, resourceFactory, true);
- int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
IndexCreateOperatorDescriptor indexCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap);
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, computeStorageMap);
indexCreateOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, partitionConstraint);
spec.addRoot(indexCreateOp);
@@ -318,19 +318,18 @@
protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor,
long numElementHint) throws AlgebricksException {
- int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
int[] pkFields = new int[dataset.getPrimaryKeys().size()];
for (int i = 0; i < pkFields.length; i++) {
pkFields[i] = fieldPermutation[i];
}
- int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider);
- ITuplePartitionerFactory partitionerFactory =
- new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+ ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+ partitioningProperties.getNumberOfPartitions());
LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
recordDesc, fieldPermutation, fillFactor, false, numElementHint, true, dataflowHelperFactory, null,
LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
- partitionsMap);
+ partitioningProperties.getComputeStorageMap());
treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
partitionConstraint);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 02c05a9..5d6c13c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,13 +19,13 @@
package org.apache.asterix.metadata.utils;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.OptimizationConfUtil;
import org.apache.asterix.common.exceptions.CompilationException;
@@ -211,10 +211,10 @@
payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
metaSerde =
metaType == null ? null : SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
- secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
- secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+ PartitioningProperties partitioningProperties =
+ metadataProvider.getPartitioningProperties(dataset, index.getIndexName());
+ secondaryFileSplitProvider = partitioningProperties.getSpiltsProvider();
+ secondaryPartitionConstraint = partitioningProperties.getConstraints();
numPrimaryKeys = dataset.getPrimaryKeys().size();
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
filterFieldName = DatasetUtil.getFilterField(dataset);
@@ -223,10 +223,10 @@
} else {
numFilterFields = 0;
}
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset);
- primaryFileSplitProvider = primarySplitsAndConstraint.first;
- primaryPartitionConstraint = primarySplitsAndConstraint.second;
+
+ PartitioningProperties datasetPartitioningProperties = metadataProvider.getPartitioningProperties(dataset);
+ primaryFileSplitProvider = datasetPartitioningProperties.getSpiltsProvider();
+ primaryPartitionConstraint = datasetPartitioningProperties.getConstraints();
setPrimaryRecDescAndComparators();
}
setSecondaryRecDescAndComparators();
@@ -447,16 +447,15 @@
throws AlgebricksException {
IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
- int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
- int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider);
- ITuplePartitionerFactory partitionerFactory =
- new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+ ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+ partitioningProperties.getNumberOfPartitions());
// when an index is being created (not loaded) the filtration is introduced in the pipeline -> no tuple filter
- LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp =
- new LSMIndexBulkLoadOperatorDescriptor(spec, secondaryRecDesc, fieldPermutation, fillFactor, false,
- numElementsHint, false, dataflowHelperFactory, primaryIndexDataflowHelperFactory,
- BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null, partitionerFactory, partitionsMap);
+ LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
+ secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
+ primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null,
+ partitionerFactory, partitioningProperties.getComputeStorageMap());
treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
secondaryPartitionConstraint);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 4fda6e4..498f3d4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -23,19 +23,17 @@
import java.util.Set;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.runtime.utils.RuntimeUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -60,9 +58,9 @@
mergePolicyFactory, mergePolicyProperties);
IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
secondaryFileSplitProvider, resourceFactory, true);
- int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
- IndexCreateOperatorDescriptor secondaryIndexCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap);
+ PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
+ IndexCreateOperatorDescriptor secondaryIndexCreateOp = new IndexCreateOperatorDescriptor(spec,
+ indexBuilderFactory, partitioningProperties.getComputeStorageMap());
secondaryIndexCreateOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
secondaryPartitionConstraint);
@@ -79,17 +77,17 @@
static JobSpecification buildDropJobSpecImpl(Dataset dataset, Index index, Set<DropOption> dropOptions,
MetadataProvider metadataProvider, SourceLocation sourceLoc) throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
- IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
- metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
+ PartitioningProperties partitioningProperties =
+ metadataProvider.getPartitioningProperties(dataset, index.getIndexName());
+ IIndexDataflowHelperFactory dataflowHelperFactory =
+ new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+ partitioningProperties.getSpiltsProvider());
// The index drop operation should be persistent regardless of temp datasets or permanent dataset.
- int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
- IndexDropOperatorDescriptor btreeDrop =
- new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, dropOptions, partitionsMap);
+ IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory,
+ dropOptions, partitioningProperties.getComputeStorageMap());
btreeDrop.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
- splitsAndConstraint.second);
+ partitioningProperties.getConstraints());
spec.addRoot(btreeDrop);
return spec;
}
@@ -97,10 +95,11 @@
@Override
public JobSpecification buildCompactJobSpec() throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
- IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
- metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
+ PartitioningProperties partitioningProperties =
+ metadataProvider.getPartitioningProperties(dataset, index.getIndexName());
+ IIndexDataflowHelperFactory dataflowHelperFactory =
+ new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+ partitioningProperties.getSpiltsProvider());
LSMTreeIndexCompactOperatorDescriptor compactOp =
new LSMTreeIndexCompactOperatorDescriptor(spec, dataflowHelperFactory);
compactOp.setSourceLocation(sourceLoc);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 1776b09..9d51420 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -211,7 +211,8 @@
// load secondary index
int[] fieldPermutation = { 3, 0 };
int[][] partitionsMap = TestUtils.getPartitionsMap(1);
- int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+ int numPartitions =
+ Arrays.stream(partitionsMap).map(partitions -> partitions.length).mapToInt(Integer::intValue).sum();
ITuplePartitionerFactory tuplePartitionerFactory2 =
new FieldHashPartitionerFactory(secondaryPKFieldPermutationB, primaryHashFunFactories, numPartitions);
TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad =
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index d5ad7da..f1dbe5f 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -344,7 +344,8 @@
int[] fieldPermutation = { 6, 7, 8, 9, 0 };
int[] pkFields = { 4 };
int[][] partitionsMap = TestUtils.getPartitionsMap(1);
- int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+ int numPartitions =
+ Arrays.stream(partitionsMap).map(partitions -> partitions.length).mapToInt(Integer::intValue).sum();
ITuplePartitionerFactory partitionerFactory =
new FieldHashPartitionerFactory(pkFields, primaryHashFactories, numPartitions);
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad =