[ASTERIXDB-3457][FUN] Add query-partition() to get all tuples in a partition
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Add internal query-partition() to get all tuples in a partition.
Ext-ref: MB-62720
Change-Id: I37185d159a38d26c8cc93ddd6500e437891c44f5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18483
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java
new file mode 100644
index 0000000..4d40ba6
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+
+public class QueryPartitionDatasource extends FunctionDataSource {
+
+ private final Dataset ds;
+ private final AlgebricksAbsolutePartitionConstraint storageLocations;
+ private final int partitionNum;
+
+ public QueryPartitionDatasource(Dataset ds, INodeDomain domain,
+ AlgebricksAbsolutePartitionConstraint storageLocations, ARecordType recType, int partitionNum)
+ throws AlgebricksException {
+ super(createQueryPartitionDataSourceId(ds), QueryPartitionRewriter.QUERY_PARTITION, domain, recType);
+ if (partitionNum < 0) {
+ throw new IllegalArgumentException("partition must be >= 0");
+ }
+ this.partitionNum = partitionNum;
+ this.ds = ds;
+ this.storageLocations = storageLocations;
+ }
+
+ @Override
+ protected void initSchemaType(IAType iType) {
+ ARecordType type = (ARecordType) iType;
+ IAType[] fieldTypes = type.getFieldTypes();
+ schemaTypes = new IAType[fieldTypes.length];
+ System.arraycopy(fieldTypes, 0, schemaTypes, 0, schemaTypes.length);
+ }
+
+ @Override
+ protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) {
+ return storageLocations;
+ }
+
+ @Override
+ public boolean isScanAccessPathALeaf() {
+ // the index scan op is not a leaf op. the ETS op will start the scan of the index. we need the ETS op below
+ // the index scan to be still generated
+ return false;
+ }
+
+ @Override
+ protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+ AlgebricksAbsolutePartitionConstraint locations) {
+ throw new UnsupportedOperationException("query-partition() does not use record reader adapter");
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime(
+ MetadataProvider metadataProvider, IDataSource<DataSourceId> dataSource,
+ List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
+ List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
+ IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
+ IProjectionFiltrationInfo projectionInfo) throws AlgebricksException {
+ return metadataProvider.getBtreePartitionSearchRuntime(jobSpec, opSchema, typeEnv, context, ds,
+ tupleFilterFactory, outputLimit, partitionNum);
+ }
+
+ @Override
+ public IDataSourcePropertiesProvider getPropertiesProvider() {
+ return new IDataSourcePropertiesProvider() {
+ @Override
+ public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables,
+ IOptimizationContext ctx) {
+ return StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
+ }
+
+ @Override
+ public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables,
+ IOptimizationContext ctx) {
+ List<ILocalStructuralProperty> propsLocal = new ArrayList<>(1);
+ return new StructuralPropertiesVector(new RandomPartitioningProperty(domain), propsLocal);
+ }
+ };
+ }
+
+ private static DataSourceId createQueryPartitionDataSourceId(Dataset dataset) {
+ return new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName(),
+ new String[] { dataset.getDatasetName(), QueryPartitionRewriter.QUERY_PARTITION.getName() });
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java
new file mode 100644
index 0000000..4514a30
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import static org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier.VARARGS;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+/**
+ * query-partition("db", "dv", "ds", 0);
+ * query-partition("dv", "ds", 0);
+ */
+public class QueryPartitionRewriter extends FunctionRewriter implements IResultTypeComputer {
+
+ public static final FunctionIdentifier QUERY_PARTITION = FunctionConstants.newAsterix("query-partition", VARARGS);
+ public static final QueryPartitionRewriter INSTANCE = new QueryPartitionRewriter(QUERY_PARTITION);
+
+ private QueryPartitionRewriter(FunctionIdentifier functionId) {
+ super(functionId);
+ }
+
+ @Override
+ public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+ throws AlgebricksException {
+ return computeRecType((AbstractFunctionCallExpression) expression, (MetadataProvider) mp, null, null, null);
+ }
+
+ @Override
+ public FunctionDataSource toDatasource(IOptimizationContext ctx, AbstractFunctionCallExpression f)
+ throws AlgebricksException {
+ final SourceLocation loc = f.getSourceLocation();
+ int numArgs = f.getArguments().size();
+ int nextArg = 0;
+ if (numArgs > 3) {
+ nextArg++;
+ }
+ DataverseName dvName = getDataverseName(loc, f.getArguments(), nextArg++);
+ String dsName = getString(loc, f.getArguments(), nextArg++);
+ Long partitionNum = ConstantExpressionUtil.getLongArgument(f, nextArg);
+ if (partitionNum == null) {
+ throw new IllegalArgumentException("partition number should be a number");
+ }
+ String dbName;
+ if (numArgs > 3) {
+ dbName = getString(loc, f.getArguments(), 0);
+ } else {
+ dbName = MetadataUtil.databaseFor(dvName);
+ }
+ MetadataProvider mp = (MetadataProvider) ctx.getMetadataProvider();
+ final Dataset dataset = validateDataset(mp, dbName, dvName, dsName, loc);
+ return createQueryPartitionDatasource(mp, dataset, partitionNum.intValue(), loc, f);
+ }
+
+ @Override
+ protected void createDataScanOp(Mutable<ILogicalOperator> opRef, UnnestOperator unnest, IOptimizationContext ctx,
+ AbstractFunctionCallExpression f) throws AlgebricksException {
+ FunctionDataSource datasource = toDatasource(ctx, f);
+ List<LogicalVariable> variables = new ArrayList<>();
+ List<Mutable<ILogicalExpression>> closedRecArgs = new ArrayList<>();
+ MetadataProvider mp = (MetadataProvider) ctx.getMetadataProvider();
+ computeRecType(f, mp, variables, closedRecArgs, ctx);
+ DataSourceScanOperator scan = new DataSourceScanOperator(variables, datasource);
+ scan.setSourceLocation(unnest.getSourceLocation());
+ List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+ scanInpList.addAll(unnest.getInputs());
+ ScalarFunctionCallExpression recordCreationFunc = new ScalarFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(BuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR), closedRecArgs);
+ recordCreationFunc.setSourceLocation(unnest.getSourceLocation());
+ AssignOperator assignOp = new AssignOperator(unnest.getVariable(), new MutableObject<>(recordCreationFunc));
+ assignOp.getInputs().add(new MutableObject<>(scan));
+ assignOp.setSourceLocation(unnest.getSourceLocation());
+ ctx.computeAndSetTypeEnvironmentForOperator(scan);
+ ctx.computeAndSetTypeEnvironmentForOperator(assignOp);
+ opRef.setValue(assignOp);
+ }
+
+ @Override
+ protected boolean invalidArgs(List<Mutable<ILogicalExpression>> args) {
+ return args.size() < 3;
+ }
+
+ private FunctionDataSource createQueryPartitionDatasource(MetadataProvider mp, Dataset ds, int partitionNum,
+ SourceLocation loc, AbstractFunctionCallExpression f) throws AlgebricksException {
+ INodeDomain domain = mp.findNodeDomain(ds.getNodeGroupName());
+ PartitioningProperties partitioningProperties = mp.getPartitioningProperties(ds);
+ AlgebricksPartitionConstraint constraints = partitioningProperties.getConstraints();
+ ARecordType recType = computeRecType(f, mp, null, null, null);
+ return new QueryPartitionDatasource(ds, domain, (AlgebricksAbsolutePartitionConstraint) constraints, recType,
+ partitionNum);
+ }
+
+ private ARecordType computeRecType(AbstractFunctionCallExpression f, MetadataProvider metadataProvider,
+ List<LogicalVariable> outVars, List<Mutable<ILogicalExpression>> closedRecArgs,
+ IOptimizationContext context) throws AlgebricksException {
+ final SourceLocation loc = f.getSourceLocation();
+ int numArgs = f.getArguments().size();
+ int nextArg = 0;
+ if (numArgs > 3) {
+ nextArg++;
+ }
+ DataverseName dvName = getDataverseName(loc, f.getArguments(), nextArg++);
+ String dsName = getString(loc, f.getArguments(), nextArg++);
+ String dbName;
+ if (numArgs > 3) {
+ dbName = getString(loc, f.getArguments(), 0);
+ } else {
+ dbName = MetadataUtil.databaseFor(dvName);
+ }
+ Dataset dataset = validateDataset(metadataProvider, dbName, dvName, dsName, loc);
+ ARecordType dsType = (ARecordType) metadataProvider.findType(dataset);
+ ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
+ dsType = (ARecordType) metadataProvider.findTypeForDatasetWithoutType(dsType, metaType, dataset);
+
+ List<IAType> dsKeyTypes = KeyFieldTypeUtil.getPartitoningKeyTypes(dataset, dsType, metaType);
+ List<List<String>> primaryKeys = dataset.getPrimaryKeys();
+ int numPrimaryKeys = dsKeyTypes.size();
+ int numPayload = metaType == null ? 1 : 2;
+ String[] fieldNames = new String[numPrimaryKeys + numPayload];
+ IAType[] fieldTypes = new IAType[numPrimaryKeys + numPayload];
+ int keyIdx = 0;
+ for (int k = 0; k < numPrimaryKeys; k++, keyIdx++) {
+ fieldTypes[keyIdx] = dsKeyTypes.get(k);
+ fieldNames[keyIdx] = StringUtils.join(primaryKeys.get(k), ".");
+ setAssignVarsExprs(outVars, closedRecArgs, context, loc, fieldNames, keyIdx);
+ }
+ fieldTypes[keyIdx] = dsType;
+ fieldNames[keyIdx] = "rec";
+ setAssignVarsExprs(outVars, closedRecArgs, context, loc, fieldNames, keyIdx);
+ if (metaType != null) {
+ keyIdx++;
+ fieldTypes[keyIdx] = metaType;
+ fieldNames[keyIdx] = "meta";
+ setAssignVarsExprs(outVars, closedRecArgs, context, loc, fieldNames, keyIdx);
+ }
+ return new ARecordType("", fieldNames, fieldTypes, false);
+ }
+
+ private void setAssignVarsExprs(List<LogicalVariable> outVars, List<Mutable<ILogicalExpression>> closedRecArgs,
+ IOptimizationContext context, SourceLocation loc, String[] fieldNames, int n) {
+ if (context != null) {
+ LogicalVariable logicalVariable = context.newVar();
+ outVars.add(logicalVariable);
+ ConstantExpression nameExpr = new ConstantExpression(new AsterixConstantValue(new AString(fieldNames[n])));
+ VariableReferenceExpression varRefExpr = new VariableReferenceExpression(logicalVariable);
+ nameExpr.setSourceLocation(loc);
+ varRefExpr.setSourceLocation(loc);
+ closedRecArgs.add(new MutableObject<>(nameExpr));
+ closedRecArgs.add(new MutableObject<>(varRefExpr));
+ }
+ }
+
+ private static Dataset validateDataset(MetadataProvider mp, String dbName, DataverseName dvName, String dsName,
+ SourceLocation loc) throws AlgebricksException {
+ Dataset dataset = mp.findDataset(dbName, dvName, dsName);
+ if (dataset == null) {
+ throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, loc, dsName,
+ MetadataUtil.dataverseName(dbName, dvName, mp.isUsingDatabase()));
+ }
+ return dataset;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index 5a2ef3c..9d39088 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -27,6 +27,7 @@
import org.apache.asterix.app.function.JobSummariesRewriter;
import org.apache.asterix.app.function.PingRewriter;
import org.apache.asterix.app.function.QueryIndexRewriter;
+import org.apache.asterix.app.function.QueryPartitionRewriter;
import org.apache.asterix.app.function.StorageComponentsRewriter;
import org.apache.asterix.app.function.TPCDSAllTablesDataGeneratorRewriter;
import org.apache.asterix.app.function.TPCDSSingleTableDataGeneratorRewriter;
@@ -100,6 +101,11 @@
BuiltinFunctions.addFunction(QueryIndexRewriter.QUERY_INDEX, QueryIndexRewriter.INSTANCE, true);
BuiltinFunctions.addUnnestFun(QueryIndexRewriter.QUERY_INDEX, false);
BuiltinFunctions.addDatasourceFunction(QueryIndexRewriter.QUERY_INDEX, QueryIndexRewriter.INSTANCE);
+ // Query index partition function
+ BuiltinFunctions.addPrivateFunction(QueryPartitionRewriter.QUERY_PARTITION, QueryPartitionRewriter.INSTANCE,
+ true);
+ BuiltinFunctions.addUnnestFun(QueryPartitionRewriter.QUERY_PARTITION, false);
+ BuiltinFunctions.addDatasourceFunction(QueryPartitionRewriter.QUERY_PARTITION, QueryPartitionRewriter.INSTANCE);
}
private MetadataBuiltinFunctions() {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 023b01c..fe2127c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -164,12 +164,14 @@
import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreePartitionSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeBatchPointSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
@@ -596,6 +598,15 @@
return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
}
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBtreePartitionSearchRuntime(
+ JobSpecification jobSpec, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context,
+ Dataset dataset, ITupleFilterFactory tupleFilterFactory, long outputLimit, int partitionNum)
+ throws AlgebricksException {
+ return getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, dataset,
+ dataset.getDatasetName(), null, null, true, true, false, null, null, null, tupleFilterFactory,
+ outputLimit, false, false, DefaultTupleProjectorFactory.INSTANCE, false, partitionNum);
+ }
+
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBtreeSearchRuntime(JobSpecification jobSpec,
IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput,
boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory, Dataset dataset, String indexName,
@@ -604,6 +615,21 @@
int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, ITupleProjectorFactory tupleProjectorFactory,
boolean partitionInputTuples) throws AlgebricksException {
+ return getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, retainInput, retainMissing,
+ nonMatchWriterFactory, dataset, indexName, lowKeyFields, highKeyFields, lowKeyInclusive,
+ highKeyInclusive, propagateFilter, nonFilterWriterFactory, minFilterFieldIndexes, maxFilterFieldIndexes,
+ tupleFilterFactory, outputLimit, isIndexOnlyPlan, isPrimaryIndexPointSearch, tupleProjectorFactory,
+ partitionInputTuples, -1);
+ }
+
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBtreeSearchRuntime(JobSpecification jobSpec,
+ IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput,
+ boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory, Dataset dataset, String indexName,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
+ boolean propagateFilter, IMissingWriterFactory nonFilterWriterFactory, int[] minFilterFieldIndexes,
+ int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory, long outputLimit,
+ boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, ITupleProjectorFactory tupleProjectorFactory,
+ boolean partitionInputTuples, int targetPartition) throws AlgebricksException {
boolean isSecondary = true;
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName());
@@ -678,12 +704,19 @@
retainMissing, nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes, tupleFilterFactory, outputLimit, tupleProjectorFactory,
tuplePartitionerFactory, partitionsMap)
- : new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields,
- lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
- nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes,
- propagateFilter, nonFilterWriterFactory, tupleFilterFactory, outputLimit,
- proceedIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
- tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
+ : targetPartition < 0 ? new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
+ highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput,
+ retainMissing, nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes, propagateFilter, nonFilterWriterFactory, tupleFilterFactory,
+ outputLimit, proceedIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
+ tupleProjectorFactory, tuplePartitionerFactory, partitionsMap)
+ : new BTreePartitionSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
+ highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput,
+ retainMissing, nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes, propagateFilter, nonFilterWriterFactory, tupleFilterFactory,
+ outputLimit, proceedIndexOnlyPlan, failValueForIndexOnlyPlan,
+ successValueForIndexOnlyPlan, tupleProjectorFactory, tuplePartitionerFactory,
+ partitionsMap, targetPartition);
} else {
btreeSearchOp = null;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
index 52cd6cf..f3881e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
@@ -110,6 +110,10 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
new file mode 100644
index 0000000..dfa59b7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.btree.dataflow;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
+
+public class BTreePartitionSearchOperatorDescriptor extends BTreeSearchOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final int targetStoragePartition;
+
+ public BTreePartitionSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
+ IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
+ IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
+ int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean appendIndexFilter,
+ IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
+ boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
+ byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory tupleProjectorFactory,
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap, int targetStoragePartition) {
+ super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
+ retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, tupleFilterFactory, outputLimit,
+ appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
+ searchCallbackProceedResultTrueValue, tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
+ this.targetStoragePartition = targetStoragePartition;
+ }
+
+ @Override
+ public BTreeSearchOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new BTreePartitionSearchOperatorNodePushable(ctx, partition,
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
+ retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
+ nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
+ searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, tupleProjectorFactory,
+ tuplePartitionerFactory, partitionsMap, targetStoragePartition);
+ }
+
+ @Override
+ public String getDisplayName() {
+ return "BTree Partition Search";
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
new file mode 100644
index 0000000..7518791
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.btree.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
+
+public class BTreePartitionSearchOperatorNodePushable extends BTreeSearchOperatorNodePushable {
+
+ private final int pIdx;
+
+ public BTreePartitionSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition,
+ RecordDescriptor inputRecDesc, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
+ boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes,
+ IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
+ IMissingWriterFactory nonMatchWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
+ boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, boolean appendOpCallbackProceedResult,
+ byte[] searchCallbackProceedResultFalseValue, byte[] searchCallbackProceedResultTrueValue,
+ ITupleProjectorFactory projectorFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+ int[][] partitionsMap, int targetStoragePartition) throws HyracksDataException {
+ super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
+ minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing,
+ nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter, nonFilterWriterFactory,
+ tupleFilterFactory, outputLimit, appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
+ searchCallbackProceedResultTrueValue, projectorFactory, tuplePartitionerFactory, partitionsMap);
+ pIdx = storagePartitionId2Index.getOrDefault(targetStoragePartition, Integer.MIN_VALUE);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ try {
+ searchPartition(tupleCount);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void searchPartition(int tupleCount) throws Exception {
+ if (pIdx >= 0 && pIdx < cursors.length) {
+ for (int i = 0; i < tupleCount && !finished; i++) {
+ resetSearchPredicate(i);
+ cursors[pIdx].close();
+ indexAccessors[pIdx].search(cursors[pIdx], searchPred);
+ writeSearchResults(i, cursors[pIdx]);
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 91b87c6..da5df23 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -120,7 +120,7 @@
protected final ITupleProjector tupleProjector;
protected final ITuplePartitioner tuplePartitioner;
protected final int[] partitions;
- private final Int2IntMap storagePartitionId2Index = new Int2IntOpenHashMap();
+ protected final Int2IntMap storagePartitionId2Index = new Int2IntOpenHashMap();
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,