[ASTERIXDB-3144][RT] Pass partitions map to inverted index
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Pass partitions map to the inverted index runtime.
- rename few methods.
Change-Id: I6ad1b0cd79f0f5e8e15da83330b8a52f9ac0108d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17463
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index a7a3838..b8eba74 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -162,7 +162,7 @@
String.valueOf(unnestMap.getOperatorTag()));
}
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = metadataProvider.buildBtreeRuntime(
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = metadataProvider.getBtreeSearchRuntime(
builder.getJobSpec(), opSchema, typeEnv, context, jobGenParams.getRetainInput(), retainMissing,
nonMatchWriterFactory, dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes,
jobGenParams.isLowKeyInclusive(), jobGenParams.isHighKeyInclusive(), propagateFilter,
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 00eef69..5bdb2db 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -186,14 +186,17 @@
IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), secondarySplitsAndConstraint.first);
- LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
- jobSpec, outputRecDesc, queryField, dataflowHelperFactory, queryTokenizerFactory,
- fullTextConfigEvaluatorFactory, searchModifierFactory, retainInput, retainMissing,
- nonMatchWriterFactory,
- dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
- IndexOperation.SEARCH, null),
- minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys,
- propagateIndexFilter, nonFilterWriterFactory, frameLimit);
+ int numPartitions = MetadataProvider.getNumPartitions(secondarySplitsAndConstraint.second);
+ int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+
+ LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp =
+ new LSMInvertedIndexSearchOperatorDescriptor(jobSpec, outputRecDesc, queryField, dataflowHelperFactory,
+ queryTokenizerFactory, fullTextConfigEvaluatorFactory, searchModifierFactory, retainInput,
+ retainMissing, nonMatchWriterFactory,
+ dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
+ IndexOperation.SEARCH, null),
+ minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys,
+ propagateIndexFilter, nonFilterWriterFactory, frameLimit, partitionsMap);
return new Pair<>(invIndexSearchOp, secondarySplitsAndConstraint.second);
}
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
index 6534ebe..6b5adea 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -106,7 +106,7 @@
}
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch =
- mp.buildRtreeRuntime(builder.getJobSpec(), outputVars, opSchema, typeEnv, context,
+ mp.getRtreeSearchRuntime(builder.getJobSpec(), outputVars, opSchema, typeEnv, context,
jobGenParams.getRetainInput(), retainMissing, nonMatchWriterFactory, dataset,
jobGenParams.getIndexName(), keyIndexes, propagateIndexFilter, nonFilterWriterFactory,
minFilterFieldIndexes, maxFilterFieldIndexes, unnestMap.getGenerateCallBackProceedResultVar());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index f962142..fda3845 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -105,9 +105,9 @@
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
IProjectionFiltrationInfo<?> projectionInfo, IProjectionFiltrationInfo<?> metaProjectionInfo)
throws AlgebricksException {
- return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, ds, indexName,
- null, null, true, true, false, null, null, null, tupleFilterFactory, outputLimit, false, false,
- DefaultTupleProjectorFactory.INSTANCE, false);
+ return metadataProvider.getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, ds,
+ indexName, null, null, true, true, false, null, null, null, tupleFilterFactory, outputLimit, false,
+ false, DefaultTupleProjectorFactory.INSTANCE, false);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index dcd52a0..07500f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -142,7 +142,7 @@
IOperatorDescriptor feedIngestor;
AlgebricksPartitionConstraint ingesterPc;
Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> t =
- metadataProvider.buildFeedIntakeRuntime(spec, feed, policyAccessor);
+ metadataProvider.getFeedIntakeRuntime(spec, feed, policyAccessor);
feedIngestor = t.first;
ingesterPc = t.second;
adapterFactory = t.third;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index c77e032..89e99fd 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -139,7 +139,7 @@
properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE, String.valueOf(externalScanBufferSize));
ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset,
edd.getAdapter(), properties, (ARecordType) itemType, null, context.getWarningCollector());
- return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
+ return metadataProvider.getExternalDatasetScanRuntime(jobSpec, itemType, adapterFactory,
tupleFilterFactory, outputLimit);
case INTERNAL:
DataSourceId id = getId();
@@ -163,7 +163,7 @@
int[] minFilterFieldIndexes = createFilterIndexes(minFilterVars, opSchema);
int[] maxFilterFieldIndexes = createFilterIndexes(maxFilterVars, opSchema);
- return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, null,
+ return metadataProvider.getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, true, false, null,
((DatasetDataSource) dataSource).getDataset(), primaryIndex.getIndexName(), null, null, true,
true, false, null, minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory,
outputLimit, false, false, tupleProjectorFactory, false);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 58377f4..9f7d567 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -115,8 +115,8 @@
dataParserFactory.setRecordType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
dataParserFactory.configure(Collections.emptyMap());
adapterFactory.configure(factory, dataParserFactory);
- return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
- tupleFilterFactory, outputLimit);
+ return metadataProvider.getExternalDatasetScanRuntime(jobSpec, itemType, adapterFactory, tupleFilterFactory,
+ outputLimit);
}
protected abstract IDatasourceFunction createFunction(MetadataProvider metadataProvider,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index c7ccc53..fc65c4b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -147,7 +147,7 @@
ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(),
alds.getAdapter(), alds.getAdapterProperties(), itemType, null, context.getWarningCollector());
RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
- return metadataProvider.buildLoadableDatasetScan(jobSpec, adapterFactory, rDesc);
+ return metadataProvider.getLoadableDatasetScanRuntime(jobSpec, adapterFactory, rDesc);
}
@Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index debf60e..4862389 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -188,7 +188,6 @@
private ResultSetId resultSetId;
private Counter resultSetIdCounter;
private TxnId txnId;
- private Map<String, Integer> externalDataLocks;
private boolean blockingOperatorDisabled = false;
public static MetadataProvider create(ICcApplicationContext appCtx, Dataverse defaultDataverse) {
@@ -326,14 +325,6 @@
return storageProperties;
}
- public Map<String, Integer> getExternalDataLocks() {
- return externalDataLocks;
- }
-
- public void setExternalDataLocks(Map<String, Integer> locks) {
- this.externalDataLocks = locks;
- }
-
private DataverseName getActiveDataverseName(DataverseName dataverseName) {
return dataverseName != null ? dataverseName
: defaultDataverse != null ? defaultDataverse.getDataverseName() : null;
@@ -496,7 +487,7 @@
context, jobSpec, implConfig, projectionInfo, metaProjectionInfo);
}
- protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
+ protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getLoadableDatasetScanRuntime(
JobSpecification jobSpec, ITypedAdapterFactory adapterFactory, RecordDescriptor rDesc)
throws AlgebricksException {
ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
@@ -511,7 +502,7 @@
return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
}
- public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> buildFeedIntakeRuntime(
+ public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> getFeedIntakeRuntime(
JobSpecification jobSpec, Feed feed, FeedPolicyAccessor policyAccessor) throws Exception {
Triple<ITypedAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
factoryOutput =
@@ -539,7 +530,7 @@
return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
}
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBtreeSearchRuntime(JobSpecification jobSpec,
IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput,
boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory, Dataset dataset, String indexName,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
@@ -633,23 +624,7 @@
return new Pair<>(btreeSearchOp, spPc.second);
}
- public static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
- if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
- return ((AlgebricksCountPartitionConstraint) constraint).getCount();
- } else {
- return ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations().length;
- }
- }
-
- public static int[][] getPartitionsMap(int numPartitions) {
- int[][] map = new int[numPartitions][1];
- for (int i = 0; i < numPartitions; i++) {
- map[i] = new int[] { i };
- }
- return map;
- }
-
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRtreeSearchRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, boolean retainMissing,
IMissingWriterFactory nonMatchWriterFactory, Dataset dataset, String indexName, int[] keyFields,
@@ -809,6 +784,53 @@
}
@Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
+ IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
+ List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ DataverseName dataverseName = dataSource.getId().getDataverseName();
+ String datasetName = dataSource.getId().getDatasourceName();
+ Dataset dataset = findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, datasetName, dataverseName);
+ }
+ int numKeys = primaryKeys.size();
+ int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
+ int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size();
+ // Move key fields to front. [keys, record, filters]
+ int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields];
+ int[] bloomFilterKeyFields = new int[numKeys];
+ int i = 0;
+ // set the keys' permutations
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = inputSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ bloomFilterKeyFields[i] = i;
+ i++;
+ }
+ // set the record permutation
+ fieldPermutation[i++] = inputSchema.findVariable(payload);
+
+ // set the meta record permutation
+ if (additionalNonFilterFields != null) {
+ for (LogicalVariable var : additionalNonFilterFields) {
+ int idx = inputSchema.findVariable(var);
+ fieldPermutation[i++] = idx;
+ }
+ }
+
+ // set the filters' permutations.
+ if (numFilterFields > 0) {
+ int idx = inputSchema.findVariable(filterKeys.get(0));
+ fieldPermutation[i++] = idx;
+ }
+
+ return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation,
+ context.getMissingWriterFactory());
+ }
+
+ @Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
@@ -816,9 +838,9 @@
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
boolean bulkload, List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
throws AlgebricksException {
- return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema,
- inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, null, recordDesc,
- context, spec, bulkload, null, null, null, secondaryKeysPipelines, pipelineTopSchema);
+ return getIndexModificationRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas,
+ typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, null, recordDesc, context,
+ spec, bulkload, null, null, null, secondaryKeysPipelines, pipelineTopSchema);
}
@Override
@@ -829,9 +851,9 @@
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
throws AlgebricksException {
- return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema,
- inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, null, recordDesc,
- context, spec, false, null, null, null, secondaryKeysPipelines, pipelineTopSchema);
+ return getIndexModificationRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
+ typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, null, recordDesc, context,
+ spec, false, null, null, null, secondaryKeysPipelines, pipelineTopSchema);
}
@Override
@@ -843,9 +865,9 @@
List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey,
RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException {
- return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema,
- inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevFilterExpr,
- recordDesc, context, spec, false, operationVar, prevSecondaryKeys, prevAdditionalFilteringKey,
+ return getIndexModificationRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema, inputSchemas,
+ typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevFilterExpr, recordDesc,
+ context, spec, false, operationVar, prevSecondaryKeys, prevAdditionalFilteringKey,
secondaryKeysPipelines, null);
}
@@ -969,53 +991,6 @@
return null;
}
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
- IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
- List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec) throws AlgebricksException {
- DataverseName dataverseName = dataSource.getId().getDataverseName();
- String datasetName = dataSource.getId().getDatasourceName();
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, datasetName, dataverseName);
- }
- int numKeys = primaryKeys.size();
- int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
- int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size();
- // Move key fields to front. [keys, record, filters]
- int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields];
- int[] bloomFilterKeyFields = new int[numKeys];
- int i = 0;
- // set the keys' permutations
- for (LogicalVariable varKey : primaryKeys) {
- int idx = inputSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- bloomFilterKeyFields[i] = i;
- i++;
- }
- // set the record permutation
- fieldPermutation[i++] = inputSchema.findVariable(payload);
-
- // set the meta record permutation
- if (additionalNonFilterFields != null) {
- for (LogicalVariable var : additionalNonFilterFields) {
- int idx = inputSchema.findVariable(var);
- fieldPermutation[i++] = idx;
- }
- }
-
- // set the filters' permutations.
- if (numFilterFields > 0) {
- int idx = inputSchema.findVariable(filterKeys.get(0));
- fieldPermutation[i++] = idx;
- }
-
- return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation,
- context.getMissingWriterFactory());
- }
-
protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> createPrimaryIndexUpsertOp(JobSpecification spec,
MetadataProvider metadataProvider, Dataset dataset, RecordDescriptor inputRecordDesc,
int[] fieldPermutation, IMissingWriterFactory missingWriterFactory) throws AlgebricksException {
@@ -1024,7 +999,7 @@
missingWriterFactory);
}
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDatasetScanRuntime(
JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory,
ITupleFilterFactory tupleFilterFactory, long outputLimit) throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.OBJECT) {
@@ -1162,13 +1137,12 @@
tupleFilterFactory, isPrimary, modCallbackFactory, tuplePartitionerFactory, partitionsMap);
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteOrUpsertRuntime(
- IndexOperation indexOp, IDataSourceIndex<String, DataSourceId> dataSourceIndex,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr,
- ILogicalExpression prevFilterExpr, RecordDescriptor inputRecordDesc, JobGenContext context,
- JobSpecification spec, boolean bulkload, LogicalVariable operationVar,
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexModificationRuntime(IndexOperation indexOp,
+ IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+ ILogicalExpression filterExpr, ILogicalExpression prevFilterExpr, RecordDescriptor inputRecordDesc,
+ JobGenContext context, JobSpecification spec, boolean bulkload, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey,
List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
throws AlgebricksException {
@@ -1202,33 +1176,33 @@
switch (secondaryIndex.getIndexType()) {
case BTREE:
- return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
+ return getBTreeModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory, inputRecordDesc,
context, spec, indexOp, bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
case ARRAY:
if (bulkload) {
// In the case of bulk-load, we do not handle any nested plans. We perform the exact same behavior
// as a normal B-Tree bulk load.
- return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
- secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory, inputRecordDesc,
- context, spec, indexOp, bulkload, operationVar, prevSecondaryKeys,
+ return getBTreeModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema,
+ primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory,
+ inputRecordDesc, context, spec, indexOp, bulkload, operationVar, prevSecondaryKeys,
prevAdditionalFilteringKeys);
} else {
- return getArrayIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
- additionalNonKeyFields, inputRecordDesc, spec, indexOp, operationVar,
+ return getArrayIndexModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema,
+ primaryKeys, additionalNonKeyFields, inputRecordDesc, spec, indexOp, operationVar,
secondaryKeysPipelines);
}
case RTREE:
- return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
+ return getRTreeModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory, inputRecordDesc,
context, spec, indexOp, bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
case SINGLE_PARTITION_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
case LENGTH_PARTITIONED_NGRAM_INVIX:
- return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
- secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory, inputRecordDesc,
- context, spec, indexOp, secondaryIndex.getIndexType(), bulkload, operationVar,
+ return getInvertedIndexModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema,
+ primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory,
+ inputRecordDesc, context, spec, indexOp, secondaryIndex.getIndexType(), bulkload, operationVar,
prevSecondaryKeys, prevAdditionalFilteringKeys);
default:
throw new AlgebricksException(
@@ -1236,13 +1210,14 @@
}
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeRuntime(DataverseName dataverseName,
- String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
- AsterixTupleFilterFactory filterFactory, AsterixTupleFilterFactory prevFilterFactory,
- RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
- boolean bulkload, LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
- List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeModificationRuntime(
+ DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
+ AsterixTupleFilterFactory prevFilterFactory, RecordDescriptor inputRecordDesc, JobGenContext context,
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable operationVar,
+ List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+ throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
int numKeys = primaryKeys.size() + secondaryKeys.size();
int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
@@ -1332,10 +1307,11 @@
}
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getArrayIndexRuntime(DataverseName dataverseName,
- String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> additionalNonKeyFields, RecordDescriptor inputRecordDesc, JobSpecification spec,
- IndexOperation indexOp, LogicalVariable operationVar, List<List<AlgebricksPipeline>> secondaryKeysPipelines)
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getArrayIndexModificationRuntime(
+ DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> additionalNonKeyFields,
+ RecordDescriptor inputRecordDesc, JobSpecification spec, IndexOperation indexOp,
+ LogicalVariable operationVar, List<List<AlgebricksPipeline>> secondaryKeysPipelines)
throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
@@ -1397,13 +1373,14 @@
}
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeRuntime(DataverseName dataverseName,
- String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
- AsterixTupleFilterFactory filterFactory, AsterixTupleFilterFactory prevFilterFactory,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
- boolean bulkload, LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
- List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeModificationRuntime(
+ DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
+ AsterixTupleFilterFactory prevFilterFactory, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable operationVar,
+ List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+ throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
String itemTypeName = dataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE
@@ -1506,7 +1483,7 @@
return new Pair<>(op, splitsAndConstraint.second);
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexRuntime(
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexModificationRuntime(
DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
@@ -1905,6 +1882,22 @@
validateDatabaseObjectNameImpl(objectName, sourceLoc);
}
+ public static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
+ if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
+ return ((AlgebricksCountPartitionConstraint) constraint).getCount();
+ } else {
+ return ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations().length;
+ }
+ }
+
+ public static int[][] getPartitionsMap(int numPartitions) {
+ int[][] map = new int[numPartitions][1];
+ for (int i = 0; i < numPartitions; i++) {
+ map[i] = new int[] { i };
+ }
+ return map;
+ }
+
private void validateDatabaseObjectNameImpl(String name, SourceLocation sourceLoc) throws AlgebricksException {
if (name == null || name.isEmpty()) {
throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, sourceLoc, "");
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
index 708c2c2..448f5ce 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
@@ -62,7 +62,7 @@
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
IProjectionFiltrationInfo<?> projectionInfo, IProjectionFiltrationInfo<?> metaProjectionInfo)
throws AlgebricksException {
- return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, dataset,
+ return metadataProvider.getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, dataset,
sampleIndexName, null, null, true, true, false, null, null, null, tupleFilterFactory, outputLimit,
false, false, DefaultTupleProjectorFactory.INSTANCE, false);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4596393..4a6e76e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -43,7 +43,6 @@
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public interface IMetadataProvider<S, I> {
- IDataSource<S> findDataSource(S id) throws AlgebricksException;
/**
* Obs: A scanner may choose to contribute a null
@@ -78,6 +77,12 @@
List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor inputRecordDesc, JobGenContext context,
JobSpecification jobSpec, boolean bulkload) throws AlgebricksException;
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
+ IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+ LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterFields,
+ List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification jobSpec) throws AlgebricksException;
+
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields,
@@ -115,6 +120,14 @@
List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
throws AlgebricksException;
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+ IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr,
+ ILogicalExpression prevFilterExpr, LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
+ JobSpecification spec, List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException;
+
/**
* Creates the delete runtime of IndexInsertDeletePOperator, which models
* insert/delete operations into a secondary index.
@@ -170,24 +183,12 @@
RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
throws AlgebricksException;
+ IDataSource<S> findDataSource(S id) throws AlgebricksException;
+
IDataSourceIndex<I, S> findDataSourceIndex(I indexId, S dataSourceId) throws AlgebricksException;
IFunctionInfo lookupFunction(FunctionIdentifier fid);
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
- IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
- LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterFields,
- List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification jobSpec) throws AlgebricksException;
-
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
- IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr,
- ILogicalExpression prevFilterExpr, LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
- LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
- JobSpecification spec, List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException;
-
ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
ILogicalExpression filterExpr, JobGenContext context) throws AlgebricksException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 1c961c5..c46391f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -57,7 +57,7 @@
protected final long outputLimit;
protected final ITupleProjectorFactory tupleProjectorFactory;
protected final ITuplePartitionerFactory tuplePartitionerFactory;
- protected final int[][] map;
+ protected final int[][] partitionsMap;
public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
@@ -79,7 +79,7 @@
IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory tupleProjectorFactory,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.retainInput = retainInput;
@@ -102,7 +102,7 @@
this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue;
this.tupleProjectorFactory = tupleProjectorFactory;
this.tuplePartitionerFactory = tuplePartitionerFactory;
- this.map = map;
+ this.partitionsMap = partitionsMap;
}
@Override
@@ -114,7 +114,7 @@
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, tupleProjectorFactory,
- tuplePartitionerFactory, map);
+ tuplePartitionerFactory, partitionsMap);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 24163ea..3fd0cf9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -68,12 +68,12 @@
IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) throws HyracksDataException {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, projectorFactory,
- tuplePartitionerFactory, map);
+ tuplePartitionerFactory, partitionsMap);
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
if (lowKeyFields != null && lowKeyFields.length > 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index c6cdd66..88c06eb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -128,10 +128,10 @@
IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean appendSearchCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) throws HyracksDataException {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
this.ctx = ctx;
this.appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
- this.partitions = map != null ? map[partition] : new int[] { partition };
+ this.partitions = partitionsMap != null ? partitionsMap[partition] : new int[] { partition };
for (int i = 0; i < partitions.length; i++) {
storagePartitionId2Index.put(partitions[i], i);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
index 9ed0782..4d8f1fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
@@ -41,11 +41,11 @@
IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory,
long outputLimit, ITupleProjectorFactory tupleProjectorFactory,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes, false, null, tupleFilterFactory, outputLimit, false, null, null,
- tupleProjectorFactory, tuplePartitionerFactory, map);
+ tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
}
@Override
@@ -55,7 +55,7 @@
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, tupleFilterFactory,
- outputLimit, tupleProjectorFactory, tuplePartitionerFactory, map);
+ outputLimit, tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 47d515a..8c8a550 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -54,11 +54,11 @@
IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
ITupleFilterFactory tupleFilterFactory, long outputLimit, ITupleProjectorFactory tupleProjectorFactory,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) throws HyracksDataException {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
minFilterKeyFields, maxFilterKeyFields, indexHelperFactory, retainInput, retainMissing,
missingWriterFactory, searchCallbackFactory, false, null, tupleFilterFactory, outputLimit, false, null,
- null, tupleProjectorFactory, tuplePartitionerFactory, map);
+ null, tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
this.keyFields = lowKeyFields;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
index b5b951d..fcdf792 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
@@ -35,7 +35,7 @@
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
public class LSMInvertedIndexSearchOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final int queryField;
private final IInvertedIndexSearchModifierFactory searchModifierFactory;
@@ -54,6 +54,7 @@
private final int numOfFields;
// the maximum number of frames that this inverted-index-search can use
private final int frameLimit;
+ private final int[][] partitionsMap;
public LSMInvertedIndexSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
int queryField, IIndexDataflowHelperFactory indexHelperFactory,
@@ -62,7 +63,8 @@
IInvertedIndexSearchModifierFactory searchModifierFactory, boolean retainInput, boolean retainMissing,
IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean isFullTextSearchQuery, int numOfFields,
- boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory, int frameLimit) {
+ boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory, int frameLimit,
+ int[][] partitionsMap) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.queryTokenizerFactory = queryTokenizerFactory;
@@ -79,6 +81,7 @@
this.appendIndexFilter = appendIndexFilter;
this.nonFilterWriterFactory = nonFilterWriterFactory;
this.numOfFields = numOfFields;
+ this.partitionsMap = partitionsMap;
this.outRecDescs[0] = outRecDesc;
this.frameLimit = frameLimit;
}
@@ -91,6 +94,7 @@
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), partition, minFilterFieldIndexes,
maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing, missingWriterFactory,
searchCallbackFactory, searchModifier, queryTokenizerFactory, fullTextConfigEvaluatorFactory,
- queryField, isFullTextSearchQuery, numOfFields, appendIndexFilter, nonFilterWriterFactory, frameLimit);
+ queryField, isFullTextSearchQuery, numOfFields, appendIndexFilter, nonFilterWriterFactory, frameLimit,
+ partitionsMap);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
index 996241d..742a86c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
@@ -65,10 +65,12 @@
IInvertedIndexSearchModifier searchModifier, IBinaryTokenizerFactory binaryTokenizerFactory,
IFullTextConfigEvaluatorFactory fullTextConfigEvaluatorFactory, int queryFieldIndex,
boolean isFullTextSearchQuery, int numOfFields, boolean appendIndexFilter,
- IMissingWriterFactory nonFilterWriterFactory, int frameLimit) throws HyracksDataException {
+ IMissingWriterFactory nonFilterWriterFactory, int frameLimit, int[][] partitionsMap)
+ throws HyracksDataException {
super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
- nonFilterWriterFactory, null, -1, false, null, null, DefaultTupleProjectorFactory.INSTANCE, null, null);
+ nonFilterWriterFactory, null, -1, false, null, null, DefaultTupleProjectorFactory.INSTANCE, null,
+ partitionsMap);
this.searchModifier = searchModifier;
this.binaryTokenizerFactory = binaryTokenizerFactory;
this.fullTextConfigEvaluatorFactory = fullTextConfigEvaluatorFactory;