ASTERIXDB-1238: Refactor AqlMetadataProvider
Change-Id: If2720817c5659622e1f713653856825d612eb892
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1016
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 72360b6..0975163 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -68,17 +68,14 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetCardinalityHint;
-import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
@@ -123,11 +120,9 @@
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
import org.apache.hyracks.algebricks.data.IAWriterFactory;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
@@ -174,23 +169,27 @@
public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
+ private final AsterixStorageProperties storageProperties;
+ private final ILibraryManager libraryManager;
+ private final Dataverse defaultDataverse;
+
private MetadataTransactionContext mdTxnCtx;
private boolean isWriteTransaction;
- private final Map<String, String[]> stores;
private Map<String, String> config;
private IAWriterFactory writerFactory;
private FileSplit outputFile;
private boolean asyncResults;
private ResultSetId resultSetId;
private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
-
- private final Dataverse defaultDataverse;
private JobId jobId;
private Map<String, Integer> locks;
private boolean isTemporaryDatasetWriteJob = true;
- private final AsterixStorageProperties storageProperties;
- private final ILibraryManager libraryManager;
+ public AqlMetadataProvider(Dataverse defaultDataverse) {
+ this.defaultDataverse = defaultDataverse;
+ this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ this.libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
+ }
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
@@ -200,21 +199,10 @@
this.config = config;
}
- public Map<String, String[]> getAllStores() {
- return stores;
- }
-
public Map<String, String> getConfig() {
return config;
}
- public AqlMetadataProvider(Dataverse defaultDataverse) {
- this.defaultDataverse = defaultDataverse;
- this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
- this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
- this.libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
- }
-
public ILibraryManager getLibraryManager() {
return libraryManager;
}
@@ -283,35 +271,6 @@
return resultSerializerFactoryProvider;
}
- /**
- * Retrieve the Output RecordType, as defined by "set output-record-type".
- */
- public ARecordType findOutputRecordType() throws AlgebricksException {
- String outputRecordType = getPropertyValue("output-record-type");
- if (outputRecordType == null) {
- return null;
- }
- String dataverse = getDefaultDataverseName();
- if (dataverse == null) {
- throw new AlgebricksException("Cannot declare output-record-type with no dataverse!");
- }
- IAType type = findType(dataverse, outputRecordType);
- if (!(type instanceof ARecordType)) {
- throw new AlgebricksException("Type " + outputRecordType + " is not a record type!");
- }
- return (ARecordType) type;
- }
-
- @Override
- public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
- AqlSourceId aqlId = id;
- try {
- return lookupSourceInMetadata(aqlId);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
public boolean isWriteTransaction() {
// The transaction writes persistent datasets.
return isWriteTransaction;
@@ -322,6 +281,98 @@
return isTemporaryDatasetWriteJob;
}
+ public IDataFormat getFormat() {
+ return FormatUtils.getDefaultFormat();
+ }
+
+ public AsterixStorageProperties getStorageProperties() {
+ return storageProperties;
+ }
+
+ public Map<String, Integer> getLocks() {
+ return locks;
+ }
+
+ public void setLocks(Map<String, Integer> locks) {
+ this.locks = locks;
+ }
+
+ /**
+ * Retrieve the Output RecordType, as defined by "set output-record-type".
+ */
+ public ARecordType findOutputRecordType() throws AlgebricksException {
+ return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, getDefaultDataverseName(),
+ getPropertyValue("output-record-type"));
+ }
+
+ public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
+ String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
+ : dataverse;
+ if (dv == null) {
+ return null;
+ }
+ return MetadataManagerUtil.findDataset(mdTxnCtx, dv, dataset);
+ }
+
+ public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException {
+ return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName);
+ }
+
+ public IAType findType(String dataverse, String typeName) throws AlgebricksException {
+ return MetadataManagerUtil.findType(mdTxnCtx, dataverse, typeName);
+ }
+
+ public Feed findFeed(String dataverse, String feedName) throws AlgebricksException {
+ return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName);
+ }
+
+ public FeedPolicyEntity findFeedPolicy(String dataverse, String policyName) throws AlgebricksException {
+ return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverse, policyName);
+ }
+
+ @Override
+ public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
+ return MetadataManagerUtil.findDataSource(mdTxnCtx, id);
+ }
+
+ public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException {
+ return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId);
+ }
+
+ @Override
+ public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
+ throws AlgebricksException {
+ AqlDataSource ads = findDataSource(dataSourceId);
+ Dataset dataset = ((DatasetDataSource) ads).getDataset();
+ try {
+ String indexName = indexId;
+ Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+ if (secondaryIndex != null) {
+ return new AqlIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
+ } else {
+ Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), dataset.getDatasetName());
+ if (primaryIndex.getIndexName().equals(indexId)) {
+ return new AqlIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
+ } else {
+ return null;
+ }
+ }
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
+ }
+ }
+
+ public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
+ return MetadataManagerUtil.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+ }
+
+ @Override
+ public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
+ return AsterixBuiltinFunctions.lookupFunction(fid);
+ }
+
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
@@ -337,8 +388,7 @@
}
}
- public static AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource)
- throws AsterixException {
+ public static AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource) {
return new AlgebricksAbsolutePartitionConstraint(feedDataSource.getLocations());
}
@@ -367,64 +417,9 @@
return format;
}
- protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
- Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
- List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
- try {
- configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
- IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
- configuration, itemType, metaType);
-
- // check to see if dataset is indexed
- Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(),
- dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
-
- if (filesIndex != null && filesIndex.getPendingOp() == 0) {
- // get files
- List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
- Iterator<ExternalFile> iterator = files.iterator();
- while (iterator.hasNext()) {
- if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
- iterator.remove();
- }
- }
- // TODO Check this call, result of merge from master!
- // ((IGenericAdapterFactory) adapterFactory).setFiles(files);
- }
-
- return adapterFactory;
- } catch (Exception e) {
- throw new AlgebricksException("Unable to create adapter", e);
- }
- }
-
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
- JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
- throws AlgebricksException {
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only scan datasets of records.");
- }
-
- ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
- RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
- ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
- adapterFactory);
-
- AlgebricksPartitionConstraint constraint;
- try {
- constraint = adapterFactory.getPartitionConstraint();
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
-
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
- }
-
public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
- Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput = null;
+ Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx,
libraryManager);
ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(),
@@ -445,8 +440,7 @@
}
AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint();
- return new Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory>(feedIngestor,
- partitionConstraint, adapterFactory);
+ return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
@@ -454,7 +448,6 @@
JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
Object implConfig, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
-
boolean isSecondary = true;
int numSecondaryKeys = 0;
try {
@@ -496,10 +489,10 @@
}
Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
- secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(),
- secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType,
- dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators,
- secondaryIndex.getKeyFieldSourceIndicators(), metaType);
+ secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
+ DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
+ dataset.hasMetaPart(), primaryKeyIndicators, secondaryIndex.getKeyFieldSourceIndicators(),
+ metaType);
comparatorFactories = comparatorFactoriesAndTypeTraits.first;
typeTraits = comparatorFactoriesAndTypeTraits.second;
if (filterTypeTraits != null) {
@@ -519,22 +512,18 @@
// get meta item type
ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
- comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType,
- metaItemType, context.getBinaryComparatorFactoryProvider());
+ comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType,
+ context.getBinaryComparatorFactoryProvider());
filterFields = DatasetUtils.createFilterFields(dataset);
btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
}
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
- dataset.getDatasetName(), indexName, temp);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), dataset.getDatasetName(),
+ indexName, temp);
- ISearchOperationCallbackFactory searchCallbackFactory = null;
+ ISearchOperationCallbackFactory searchCallbackFactory;
if (isSecondary) {
searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
: new SecondaryIndexSearchOperationCallbackFactory();
@@ -580,83 +569,29 @@
int[] buddyBreeFields = new int[] { numSecondaryKeys };
ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory =
new ExternalBTreeWithBuddyDataflowHelperFactory(
- compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
- getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
- ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
+ compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+ getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
+ ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
retainMissing, context.getMissingWriterFactory(), searchCallbackFactory);
}
-
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
-
+ return new Pair<>(btreeSearchOp, spPc.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
- private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
- IndexType indexType, List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes,
- List<List<String>> pidxKeyFieldNames, ARecordType recType, DatasetType dsType, boolean hasMeta,
- List<Integer> primaryIndexKeyIndicators, List<Integer> secondaryIndexIndicators, ARecordType metaType)
- throws AlgebricksException {
-
- IBinaryComparatorFactory[] comparatorFactories;
- ITypeTraits[] typeTraits;
- int sidxKeyFieldCount = sidxKeyFieldNames.size();
- int pidxKeyFieldCount = pidxKeyFieldNames.size();
- typeTraits = new ITypeTraits[sidxKeyFieldCount + pidxKeyFieldCount];
- comparatorFactories = new IBinaryComparatorFactory[sidxKeyFieldCount + pidxKeyFieldCount];
-
- int i = 0;
- for (; i < sidxKeyFieldCount; ++i) {
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
- sidxKeyFieldNames.get(i),
- (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
- IAType keyType = keyPairType.first;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
-
- for (int j = 0; j < pidxKeyFieldCount; ++j, ++i) {
- IAType keyType = null;
- try {
- switch (dsType) {
- case INTERNAL:
- keyType = (hasMeta && primaryIndexKeyIndicators.get(j).intValue() == 1)
- ? metaType.getSubFieldType(pidxKeyFieldNames.get(j))
- : recType.getSubFieldType(pidxKeyFieldNames.get(j));
- break;
- case EXTERNAL:
- keyType = IndexingConstants.getFieldType(j);
- break;
- default:
- throw new AlgebricksException("Unknown Dataset Type");
- }
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
-
- return new Pair<IBinaryComparatorFactory[], ITypeTraits[]>(comparatorFactories, typeTraits);
- }
-
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
-
try {
- ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(),
- dataset.getItemTypeName());
+ ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
boolean temp = dataset.getDatasetDetails().isTemp();
@@ -670,9 +605,9 @@
List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
int numSecondaryKeys = secondaryKeyFields.size();
if (numSecondaryKeys != 1) {
- throw new AlgebricksException("Cannot use " + numSecondaryKeys
- + " fields as a key for the R-tree index. "
- + "There can be only one field as a key for the R-tree index.");
+ throw new AlgebricksException(
+ "Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. "
+ + "There can be only one field as a key for the R-tree index.");
}
Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
secondaryKeyFields.get(0), recType);
@@ -701,8 +636,8 @@
numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
- splitProviderAndPartitionConstraintsForDataset(
- dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
+ splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName, temp);
ARecordType metaType = null;
if (dataset.hasMetaPart()) {
metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
@@ -770,28 +705,12 @@
retainMissing, context.getMissingWriterFactory(), searchCallbackFactory);
}
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
-
+ return new Pair<>(rtreeSearchOp, spPc.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
- private IBinaryComparatorFactory[] getMergedComparatorFactories(IBinaryComparatorFactory[] comparatorFactories,
- IBinaryComparatorFactory[] primaryComparatorFactories) {
- IBinaryComparatorFactory[] btreeCompFactories = null;
- int btreeCompFactoriesCount = comparatorFactories.length + primaryComparatorFactories.length;
- btreeCompFactories = new IBinaryComparatorFactory[btreeCompFactoriesCount];
- int i = 0;
- for (; i < comparatorFactories.length; i++) {
- btreeCompFactories[i] = comparatorFactories[i];
- }
- for (int j = 0; i < btreeCompFactoriesCount; i++, j++) {
- btreeCompFactories[i] = primaryComparatorFactories[j];
- }
- return btreeCompFactories;
- }
-
@Override
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
@@ -804,7 +723,7 @@
SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
getWriterFactory(), inputDesc);
AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
- return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
+ return new Pair<>(runtime, apc);
}
@Override
@@ -814,7 +733,6 @@
ResultSetDataSink rsds = (ResultSetDataSink) sink;
ResultSetSinkId rssId = rsds.getId();
ResultSetId rsId = rssId.getResultSetId();
-
ResultWriterOperatorDescriptor resultWriter = null;
try {
IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
@@ -824,48 +742,7 @@
} catch (IOException e) {
throw new AlgebricksException(e);
}
-
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(resultWriter, null);
- }
-
- @Override
- public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
- throws AlgebricksException {
- AqlDataSource ads = findDataSource(dataSourceId);
- Dataset dataset = ((DatasetDataSource) ads).getDataset();
-
- try {
- String indexName = indexId;
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
- if (secondaryIndex != null) {
- return new AqlIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
- } else {
- Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), dataset.getDatasetName());
- if (primaryIndex.getIndexName().equals(indexId)) {
- return new AqlIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
- } else {
- return null;
- }
- }
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
- }
-
- public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException, MetadataException {
- Dataset dataset = findDataset(aqlId.getDataverseName(), aqlId.getDatasourceName());
- if (dataset == null) {
- throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
- }
- IAType itemType = findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
- IAType metaItemType = findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
- INodeDomain domain = findNodeDomain(dataset.getNodeGroupName());
- byte datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL)
- ? AqlDataSourceType.EXTERNAL_DATASET : AqlDataSourceType.INTERNAL_DATASET;
- return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType,
- dataset.getDatasetDetails(), domain);
+ return new Pair<>(resultWriter, null);
}
@Override
@@ -876,18 +753,13 @@
String dataverseName = dataSource.getId().getDataverseName();
String datasetName = dataSource.getId().getDatasourceName();
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
- }
-
+ Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
int numKeys = keys.size();
int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
// move key fields to front
int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
int[] bloomFilterKeyFields = new int[numKeys];
- // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
int i = 0;
for (LogicalVariable varKey : keys) {
int idx = propagatedSchema.findVariable(varKey);
@@ -923,8 +795,8 @@
itemType, metaType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+ splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(),
+ datasetName, indexName, temp);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -953,114 +825,7 @@
LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
filterCmpFactories, btreeFields, filterFields, !temp));
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
- splitsAndConstraint.second);
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
- }
-
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
- IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload,
- List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException {
-
- String datasetName = dataSource.getId().getDatasourceName();
- Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
- if (dataset == null) {
- throw new AlgebricksException(
- "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName());
- }
- boolean temp = dataset.getDatasetDetails().isTemp();
- isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
- int numKeys = keys.size();
- int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
- // Move key fields to front.
- int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
- + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())];
- int[] bloomFilterKeyFields = new int[numKeys];
- int i = 0;
- for (LogicalVariable varKey : keys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- bloomFilterKeyFields[i] = i;
- i++;
- }
- fieldPermutation[i++] = propagatedSchema.findVariable(payload);
- if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
- fieldPermutation[i++] = idx;
- }
- if (additionalNonFilteringFields != null) {
- for (LogicalVariable variable : additionalNonFilteringFields) {
- int idx = propagatedSchema.findVariable(variable);
- fieldPermutation[i++] = idx;
- }
- }
-
- try {
- Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), dataset.getDatasetName());
- String indexName = primaryIndex.getIndexName();
- ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
- .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
- ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
- ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
-
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
- itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataSource.getId().getDataverseName(), datasetName, indexName, temp);
-
- // prepare callback
- JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- int[] primaryKeyFields = new int[numKeys];
- for (i = 0; i < numKeys; i++) {
- primaryKeyFields[i] = i;
- }
-
- ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
- itemType, context.getBinaryComparatorFactoryProvider());
- int[] filterFields = DatasetUtils.createFilterFields(dataset);
- int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-
- TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
- : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
- txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart());
-
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
- .getMergePolicyFactory(dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
- new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
- btreeFields, filterFields, !temp);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
- } else {
- op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
- fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory,
- NoOpOperationCallbackFactory.INSTANCE);
- }
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-
+ return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
@@ -1072,7 +837,7 @@
List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec, boolean bulkload) throws AlgebricksException {
- return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, typeEnv, keys, payload,
+ return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, keys, payload,
additionalNonKeyFields, recordDesc, context, spec, bulkload, additionalNonFilteringFields);
}
@@ -1081,58 +846,10 @@
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException {
- return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, typeEnv, keys, payload,
+ return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, keys, payload,
additionalNonKeyFields, recordDesc, context, spec, false, null);
}
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(
- IndexOperation indexOp, IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
- String indexName = dataSourceIndex.getId();
- String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
- String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
-
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- Index secondaryIndex;
- try {
- secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
- switch (secondaryIndex.getIndexType()) {
- case BTREE: {
- return getBTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
- secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
- bulkload);
- }
- case RTREE: {
- return getRTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
- secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
- bulkload);
- }
- case SINGLE_PARTITION_WORD_INVIX:
- case SINGLE_PARTITION_NGRAM_INVIX:
- case LENGTH_PARTITIONED_WORD_INVIX:
- case LENGTH_PARTITIONED_NGRAM_INVIX: {
- return getInvertedIndexDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
- primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec,
- indexOp, secondaryIndex.getIndexType(), bulkload);
- }
- default: {
- throw new AlgebricksException(
- "Insert and delete not implemented for index type: " + secondaryIndex.getIndexType());
- }
- }
- }
-
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
@@ -1140,9 +857,34 @@
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
boolean bulkload) throws AlgebricksException {
- return getIndexInsertOrDeleteRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas,
- typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
- bulkload);
+ return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema,
+ inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
+ context, spec, bulkload, null, null);
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
+ IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+ ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema,
+ inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
+ context, spec, false, null, null);
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+ IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
+ ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema,
+ inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, recordDesc,
+ context, spec, false, prevSecondaryKeys, prevAdditionalFilteringKey);
}
@Override
@@ -1156,17 +898,14 @@
String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
- IOperatorSchema inputSchema = new OperatorSchemaImpl();
+ IOperatorSchema inputSchema;
if (inputSchemas.length > 0) {
inputSchema = inputSchemas[0];
} else {
throw new AlgebricksException("TokenizeOperator can not operate without any input variable.");
}
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
+ Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
Index secondaryIndex;
try {
secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -1174,750 +913,18 @@
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
- AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
// TokenizeOperator only supports a keyword or n-gram index.
switch (secondaryIndex.getIndexType()) {
case SINGLE_PARTITION_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
- case LENGTH_PARTITIONED_NGRAM_INVIX: {
+ case LENGTH_PARTITIONED_NGRAM_INVIX:
return getBinaryTokenizerRuntime(dataverseName, datasetName, indexName, inputSchema, propagatedSchema,
- typeEnv, primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
- IndexOperation.INSERT, secondaryIndex.getIndexType(), bulkload);
- }
- default: {
+ primaryKeys, secondaryKeys, recordDesc, spec, secondaryIndex.getIndexType());
+ default:
throw new AlgebricksException("Currently, we do not support TokenizeOperator for the index type: "
+ secondaryIndex.getIndexType());
- }
}
-
- }
-
- // Get a Tokenizer for the bulk-loading data into a n-gram or keyword index.
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBinaryTokenizerRuntime(String dataverseName,
- String datasetName, String indexName, IOperatorSchema inputSchema, IOperatorSchema propagatedSchema,
- IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload)
- throws AlgebricksException {
-
- // Sanity checks.
- if (primaryKeys.size() > 1) {
- throw new AlgebricksException("Cannot tokenize composite primary key.");
- }
- if (secondaryKeys.size() > 1) {
- throw new AlgebricksException("Cannot tokenize composite secondary key fields.");
- }
-
- boolean isPartitioned;
- if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
- || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
- isPartitioned = true;
- } else {
- isPartitioned = false;
- }
-
- // Number of Keys that needs to be propagated
- int numKeys = inputSchema.getSize();
-
- // Get the rest of Logical Variables that are not (PK or SK) and each
- // variable's positions.
- // These variables will be propagated through TokenizeOperator.
- List<LogicalVariable> otherKeys = new ArrayList<LogicalVariable>();
- if (inputSchema.getSize() > 0) {
- for (int k = 0; k < inputSchema.getSize(); k++) {
- boolean found = false;
- for (LogicalVariable varKey : primaryKeys) {
- if (varKey.equals(inputSchema.getVariable(k))) {
- found = true;
- break;
- } else {
- found = false;
- }
- }
- if (!found) {
- for (LogicalVariable varKey : secondaryKeys) {
- if (varKey.equals(inputSchema.getVariable(k))) {
- found = true;
- break;
- } else {
- found = false;
- }
- }
- }
- if (!found) {
- otherKeys.add(inputSchema.getVariable(k));
- }
- }
- }
-
- // For tokenization, sorting and loading.
- // One token (+ optional partitioning field) + primary keys + secondary
- // keys + other variables
- // secondary keys and other variables will be just passed to the
- // IndexInsertDelete Operator.
- int numTokenKeyPairFields = (!isPartitioned) ? 1 + numKeys : 2 + numKeys;
-
- // generate field permutations for the input
- int[] fieldPermutation = new int[numKeys];
-
- int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
- int i = 0;
- int j = 0;
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- modificationCallbackPrimaryKeyFields[j] = i;
- i++;
- j++;
- }
- for (LogicalVariable varKey : otherKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- for (LogicalVariable varKey : secondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
-
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
- }
- String itemTypeName = dataset.getItemTypeName();
- IAType itemType;
- try {
- itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
- .getDatatype();
-
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be tokenized.");
- }
-
- ARecordType recType = (ARecordType) itemType;
-
- // Index parameters.
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
-
- List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
- List<IAType> secondaryKeyTypeEntries = secondaryIndex.getKeyFieldTypes();
-
- int numTokenFields = (!isPartitioned) ? secondaryKeys.size() : secondaryKeys.size() + 1;
- ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
- ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
-
- // Find the key type of the secondary key. If it's a derived type,
- // return the derived type.
- // e.g. UNORDERED LIST -> return UNORDERED LIST type
- IAType secondaryKeyType = null;
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0),
- secondaryKeyExprs.get(0), recType);
- secondaryKeyType = keyPairType.first;
- List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
- i = 0;
- for (List<String> partitioningKey : partitioningKeys) {
- IAType keyType = recType.getSubFieldType(partitioningKey);
- invListsTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
-
- tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
- if (isPartitioned) {
- // The partitioning field is hardcoded to be a short *without*
- // an Asterix type tag.
- tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
- }
-
- IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
- secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
-
- // Generate Output Record format
- ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
- ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
- ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
-
- // The order of the output record: propagated variables (including
- // PK and SK), token, and number of token.
- // #1. propagate all input variables
- for (int k = 0; k < recordDesc.getFieldCount(); k++) {
- tokenKeyPairFields[k] = recordDesc.getFields()[k];
- tokenKeyPairTypeTraits[k] = recordDesc.getTypeTraits()[k];
- }
- int tokenOffset = recordDesc.getFieldCount();
-
- // #2. Specify the token type
- tokenKeyPairFields[tokenOffset] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
- tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[0];
- tokenOffset++;
-
- // #3. Specify the length-partitioning key: number of token
- if (isPartitioned) {
- tokenKeyPairFields[tokenOffset] = ShortSerializerDeserializer.INSTANCE;
- tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[1];
- }
-
- RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
- IOperatorDescriptor tokenizerOp;
-
- // Keys to be tokenized : SK
- int docField = fieldPermutation[fieldPermutation.length - 1];
-
- // Keys to be propagated
- int[] keyFields = new int[numKeys];
- for (int k = 0; k < keyFields.length; k++) {
- keyFields[k] = k;
- }
-
- tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory, docField,
- keyFields, isPartitioned, true);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(tokenizerOp,
- splitsAndConstraint.second);
-
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
- IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
- IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
- ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
- return getIndexInsertOrDeleteRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
- typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
- false);
- }
-
- private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
- throws AlgebricksException {
- // No filtering condition.
- if (filterExpr == null) {
- return null;
- }
- IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
- IScalarEvaluatorFactory filterEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(filterExpr,
- typeEnv, inputSchemas, context);
- return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspectorFactory());
- }
-
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String dataverseName,
- String datasetName, String indexName, IOperatorSchema propagatedSchema,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
- boolean bulkload) throws AlgebricksException {
-
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
- }
- boolean temp = dataset.getDatasetDetails().isTemp();
- isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
- int numKeys = primaryKeys.size() + secondaryKeys.size();
- int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-
- // generate field permutations
- int[] fieldPermutation = new int[numKeys + numFilterFields];
- int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
- int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
- int i = 0;
- int j = 0;
- for (LogicalVariable varKey : secondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- bloomFilterKeyFields[i] = i;
- i++;
- }
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- modificationCallbackPrimaryKeyFields[j] = i;
- i++;
- j++;
- }
- if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
- fieldPermutation[numKeys] = idx;
- }
-
- String itemTypeName = dataset.getItemTypeName();
- IAType itemType;
- try {
- itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
- .getDatatype();
-
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
-
- ARecordType recType = (ARecordType) itemType;
-
- // Index parameters.
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
-
- ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
- recType, context.getBinaryComparatorFactoryProvider());
- int[] filterFields = null;
- int[] btreeFields = null;
- if (filterTypeTraits != null) {
- filterFields = new int[1];
- filterFields[0] = numKeys;
- btreeFields = new int[numKeys];
- for (int k = 0; k < btreeFields.length; k++) {
- btreeFields[k] = k;
- }
- }
-
- List<List<String>> secondaryKeyNames = secondaryIndex.getKeyFieldNames();
- List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
- ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
- for (i = 0; i < secondaryKeys.size(); ++i) {
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
- secondaryKeyNames.get(i), recType);
- IAType keyType = keyPairType.first;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
- List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
- for (List<String> partitioningKey : partitioningKeys) {
- IAType keyType = recType.getSubFieldType(partitioningKey);
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
-
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
-
- // prepare callback
- JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
- : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
- dataset.hasMetaPart());
-
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
- .getMergePolicyFactory(dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
- btreeFields, filterFields, !temp);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
- } else {
- op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
- fieldPermutation, indexOp,
- new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
- compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields, !temp),
- filterFactory, false, indexName, null, modificationCallbackFactory,
- NoOpOperationCallbackFactory.INSTANCE);
- }
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- }
-
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexDmlRuntime(String dataverseName,
- String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
- IndexType indexType, boolean bulkload) throws AlgebricksException {
-
- // Check the index is length-partitioned or not.
- boolean isPartitioned;
- if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
- || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
- isPartitioned = true;
- } else {
- isPartitioned = false;
- }
-
- // Sanity checks.
- if (primaryKeys.size() > 1) {
- throw new AlgebricksException("Cannot create inverted index on dataset with composite primary key.");
- }
- // The size of secondaryKeys can be two if it receives input from its
- // TokenizeOperator- [token, number of token]
- if (secondaryKeys.size() > 1 && !isPartitioned) {
- throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
- } else if (secondaryKeys.size() > 2 && isPartitioned) {
- throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
- }
-
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
- }
- boolean temp = dataset.getDatasetDetails().isTemp();
- isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
- // For tokenization, sorting and loading.
- // One token (+ optional partitioning field) + primary keys: [token,
- // number of token, PK]
- int numKeys = primaryKeys.size() + secondaryKeys.size();
- int numTokenKeyPairFields = (!isPartitioned) ? 1 + primaryKeys.size() : 2 + primaryKeys.size();
- int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-
- // generate field permutations
- int[] fieldPermutation = new int[numKeys + numFilterFields];
- int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
- int i = 0;
- int j = 0;
-
- // If the index is partitioned: [token, number of token]
- // Otherwise: [token]
- for (LogicalVariable varKey : secondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- modificationCallbackPrimaryKeyFields[j] = i;
- i++;
- j++;
- }
- if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
- fieldPermutation[numKeys] = idx;
- }
-
- String itemTypeName = dataset.getItemTypeName();
- IAType itemType;
- try {
- itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
- .getDatatype();
-
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
-
- ARecordType recType = (ARecordType) itemType;
-
- // Index parameters.
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
-
- List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
- List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-
- int numTokenFields = 0;
-
- // SecondaryKeys.size() can be two if it comes from the bulkload.
- // In this case, [token, number of token] are the secondaryKeys.
- if (!isPartitioned || (secondaryKeys.size() > 1)) {
- numTokenFields = secondaryKeys.size();
- } else if (isPartitioned && (secondaryKeys.size() == 1)) {
- numTokenFields = secondaryKeys.size() + 1;
- }
-
- ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
- ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
- ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
- IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
- IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
- dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
-
- IAType secondaryKeyType = null;
-
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
- secondaryKeyExprs.get(0), recType);
- secondaryKeyType = keyPairType.first;
-
- List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-
- i = 0;
- for (List<String> partitioningKey : partitioningKeys) {
- IAType keyType = recType.getSubFieldType(partitioningKey);
- invListsTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
-
- tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
- tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
- if (isPartitioned) {
- // The partitioning field is hardcoded to be a short *without*
- // an Asterix type tag.
- tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
- tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
- }
- IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
- secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
-
- ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
- recType, context.getBinaryComparatorFactoryProvider());
-
- int[] filterFields = null;
- int[] invertedIndexFields = null;
- int[] filterFieldsForNonBulkLoadOps = null;
- int[] invertedIndexFieldsForNonBulkLoadOps = null;
- if (filterTypeTraits != null) {
- filterFields = new int[1];
- filterFields[0] = numTokenFields + primaryKeys.size();
- invertedIndexFields = new int[numTokenFields + primaryKeys.size()];
- for (int k = 0; k < invertedIndexFields.length; k++) {
- invertedIndexFields[k] = k;
- }
-
- filterFieldsForNonBulkLoadOps = new int[numFilterFields];
- filterFieldsForNonBulkLoadOps[0] = numTokenKeyPairFields;
- invertedIndexFieldsForNonBulkLoadOps = new int[numTokenKeyPairFields];
- for (int k = 0; k < invertedIndexFieldsForNonBulkLoadOps.length; k++) {
- invertedIndexFieldsForNonBulkLoadOps[k] = k;
- }
- }
-
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
-
- // prepare callback
- JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_INVERTED_INDEX)
- : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
-
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
- .getMergePolicyFactory(dataset, mdTxnCtx);
- IIndexDataflowHelperFactory indexDataFlowFactory;
- if (!isPartitioned) {
- indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
- filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
- invertedIndexFieldsForNonBulkLoadOps, !temp);
- } else {
- indexDataFlowFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
- compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
- filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
- invertedIndexFieldsForNonBulkLoadOps, !temp);
- }
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, false,
- numElementsHint, false, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
- appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, invListComparatorFactories, tokenizerFactory, indexDataFlowFactory);
- } else {
- op = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), splitsAndConstraint.first,
- appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
- indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName);
- }
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- }
-
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(String dataverseName,
- String datasetName, String indexName, IOperatorSchema propagatedSchema,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
- boolean bulkload) throws AlgebricksException {
- try {
- Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-
- boolean temp = dataset.getDatasetDetails().isTemp();
- isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
- String itemTypeName = dataset.getItemTypeName();
- IAType itemType = MetadataManager.INSTANCE
- .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
- ARecordType recType = (ARecordType) itemType;
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
- List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
- List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
- secondaryKeyExprs.get(0), recType);
- IAType spatialType = keyPairType.first;
- boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
- || spatialType.getTypeTag() == ATypeTag.POINT3D;
- int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
- int numSecondaryKeys = dimension * 2;
- int numPrimaryKeys = primaryKeys.size();
- int numKeys = numSecondaryKeys + numPrimaryKeys;
- ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
-
- int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
- int[] fieldPermutation = new int[numKeys + numFilterFields];
- int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
- int i = 0;
- int j = 0;
-
- for (LogicalVariable varKey : secondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- modificationCallbackPrimaryKeyFields[j] = i;
- i++;
- j++;
- }
-
- if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
- fieldPermutation[numKeys] = idx;
- }
- IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
- IPrimitiveValueProviderFactory[] valueProviderFactories =
- new IPrimitiveValueProviderFactory[numSecondaryKeys];
- for (i = 0; i < numSecondaryKeys; i++) {
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(nestedKeyType, true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
- valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
- }
- List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
- for (List<String> partitioningKey : partitioningKeys) {
- IAType keyType = recType.getSubFieldType(partitioningKey);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
- ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
- IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
- dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
- int[] btreeFields = new int[primaryComparatorFactories.length];
- for (int k = 0; k < btreeFields.length; k++) {
- btreeFields[k] = k + numSecondaryKeys;
- }
-
- ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
- recType, context.getBinaryComparatorFactoryProvider());
- int[] filterFields = null;
- int[] rtreeFields = null;
- if (filterTypeTraits != null) {
- filterFields = new int[1];
- filterFields[0] = numSecondaryKeys + numPrimaryKeys;
- rtreeFields = new int[numSecondaryKeys + numPrimaryKeys];
- for (int k = 0; k < rtreeFields.length; k++) {
- rtreeFields[k] = k;
- }
- }
-
- // prepare callback
- JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider,
- indexOp, ResourceType.LSM_RTREE)
- : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider,
- indexOp, ResourceType.LSM_RTREE,
- dataset.hasMetaPart());
-
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
- .getMergePolicyFactory(dataset, mdTxnCtx);
-
- IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
- primaryComparatorFactories);
- IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
- valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
- new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
- compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
- AqlMetadataProvider.proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
- rtreeFields, filterTypeTraits, filterCmpFactories, filterFields, !temp, isPointMBR);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
- primaryComparatorFactories, btreeFields, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idff);
- } else {
- op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, indexOp,
- idff, filterFactory, false, indexName, null, modificationCallbackFactory,
- NoOpOperationCallbackFactory.INSTANCE);
- }
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
- throws AlgebricksException {
- return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
- numKeyFields / 2);
}
/**
@@ -1945,12 +952,48 @@
for (String nd : nodeGroup) {
numPartitions += AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
}
- return numElementsHint /= numPartitions;
+ numElementsHint = numElementsHint / numPartitions;
+ return numElementsHint;
}
- @Override
- public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
- return AsterixBuiltinFunctions.lookupFunction(fid);
+ protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
+ Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
+ List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
+ try {
+ configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
+ IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
+ configuration, itemType, metaType);
+
+ // check to see if dataset is indexed
+ Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(),
+ dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
+
+ if (filesIndex != null && filesIndex.getPendingOp() == 0) {
+ // get files
+ List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+ Iterator<ExternalFile> iterator = files.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
+ iterator.remove();
+ }
+ }
+ }
+
+ return adapterFactory;
+ } catch (Exception e) {
+ throw new AlgebricksException("Unable to create adapter", e);
+ }
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
+ throws AlgebricksException {
+ return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
+ numKeyFields / 2);
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
@@ -1971,7 +1014,7 @@
public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName)
throws MetadataException {
- DatasourceAdapter adapter = null;
+ DatasourceAdapter adapter;
// search in default namespace (built-in adapter)
adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
@@ -1982,98 +1025,16 @@
return adapter;
}
- public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
- String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
- : dataverse;
- if (dv == null) {
- return null;
- }
- try {
- return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dv, dataset);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException {
- NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroupName);
- List<String> partitions = new ArrayList<>();
- for (String location : nodeGroup.getNodeNames()) {
- int numPartitions = AsterixClusterProperties.INSTANCE.getNodePartitionsCount(location);
- for (int i = 0; i < numPartitions; i++) {
- partitions.add(location);
- }
- }
- return new DefaultNodeGroupDomain(partitions);
- }
-
- public IAType findType(String dataverse, String typeName) throws AlgebricksException {
- if (dataverse == null || typeName == null) {
- return null;
- }
- Datatype type;
- try {
- type = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverse, typeName);
- } catch (MetadataException e) {
- throw new AlgebricksException(
- "Metadata exception while looking up type '" + typeName + "' in dataverse '" + dataverse + "'", e);
- }
- if (type == null) {
- throw new AlgebricksException("Type name '" + typeName + "' unknown in dataverse '" + dataverse + "'");
- }
- return type.getDatatype();
- }
-
- public Feed findFeed(String dataverse, String feedName) throws AlgebricksException {
- try {
- return MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, feedName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public FeedPolicyEntity findFeedPolicy(String dataverse, String policyName) throws AlgebricksException {
- try {
- return MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverse, policyName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
- public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
- try {
- return MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- }
-
public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
return AsterixClusterProperties.INSTANCE.getClusterLocations();
}
- public IDataFormat getFormat() {
- return FormatUtils.getDefaultFormat();
- }
-
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName,
datasetName, targetIdxName, create);
}
- public AsterixStorageProperties getStorageProperties() {
- return storageProperties;
- }
-
- public Map<String, Integer> getLocks() {
- return locks;
- }
-
- public void setLocks(Map<String, Integer> locks) {
- this.locks = locks;
- }
-
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, int[] ridIndexes, boolean retainInput,
IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema,
@@ -2081,7 +1042,7 @@
throws AlgebricksException {
try {
// Get data type
- IAType itemType = null;
+ IAType itemType;
itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
@@ -2123,9 +1084,7 @@
appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(),
metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory,
retainMissing, context.getMissingWriterFactory());
-
- // Return value
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, spPc.second);
+ return new Pair<>(op, spPc.second);
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -2149,7 +1108,7 @@
int numKeys = primaryKeys.size();
int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size();
- // Move key fields to front. {keys, record, filters}
+ // Move key fields to front. [keys, record, filters]
int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields];
int[] bloomFilterKeyFields = new int[numKeys];
int i = 0;
@@ -2190,8 +1149,8 @@
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+ splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
+ indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2277,23 +1236,192 @@
}
}
- // TODO refactor this method
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
- IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
- IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
- ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
- LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec) throws AlgebricksException {
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
+ JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
+ throws AlgebricksException {
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only scan datasets of records.");
+ }
+
+ ISerializerDeserializer<?> payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+ RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+ ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
+ adapterFactory);
+
+ AlgebricksPartitionConstraint constraint;
+ try {
+ constraint = adapterFactory.getPartitionConstraint();
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+
+ return new Pair<>(dataScanner, constraint);
+ }
+
+ private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
+ List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes, List<List<String>> pidxKeyFieldNames,
+ ARecordType recType, DatasetType dsType, boolean hasMeta, List<Integer> primaryIndexKeyIndicators,
+ List<Integer> secondaryIndexIndicators, ARecordType metaType) throws AlgebricksException {
+
+ IBinaryComparatorFactory[] comparatorFactories;
+ ITypeTraits[] typeTraits;
+ int sidxKeyFieldCount = sidxKeyFieldNames.size();
+ int pidxKeyFieldCount = pidxKeyFieldNames.size();
+ typeTraits = new ITypeTraits[sidxKeyFieldCount + pidxKeyFieldCount];
+ comparatorFactories = new IBinaryComparatorFactory[sidxKeyFieldCount + pidxKeyFieldCount];
+
+ int i = 0;
+ for (; i < sidxKeyFieldCount; ++i) {
+ Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
+ sidxKeyFieldNames.get(i),
+ (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
+ IAType keyType = keyPairType.first;
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+
+ for (int j = 0; j < pidxKeyFieldCount; ++j, ++i) {
+ IAType keyType = null;
+ try {
+ switch (dsType) {
+ case INTERNAL:
+ keyType = (hasMeta && primaryIndexKeyIndicators.get(j).intValue() == 1)
+ ? metaType.getSubFieldType(pidxKeyFieldNames.get(j))
+ : recType.getSubFieldType(pidxKeyFieldNames.get(j));
+ break;
+ case EXTERNAL:
+ keyType = IndexingConstants.getFieldType(j);
+ break;
+ default:
+ throw new AlgebricksException("Unknown Dataset Type");
+ }
+ } catch (AsterixException e) {
+ throw new AlgebricksException(e);
+ }
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+
+ return new Pair<>(comparatorFactories, typeTraits);
+ }
+
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+ LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec, boolean bulkload,
+ List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException {
+
+ String datasetName = dataSource.getId().getDatasourceName();
+ Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataSource.getId().getDataverseName(),
+ datasetName);
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+ int numKeys = keys.size();
+ int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+ // Move key fields to front.
+ int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
+ + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())];
+ int[] bloomFilterKeyFields = new int[numKeys];
+ int i = 0;
+ for (LogicalVariable varKey : keys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ bloomFilterKeyFields[i] = i;
+ i++;
+ }
+ fieldPermutation[i++] = propagatedSchema.findVariable(payload);
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
+ fieldPermutation[i++] = idx;
+ }
+ if (additionalNonFilteringFields != null) {
+ for (LogicalVariable variable : additionalNonFilteringFields) {
+ int idx = propagatedSchema.findVariable(variable);
+ fieldPermutation[i++] = idx;
+ }
+ }
+
+ try {
+ Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), dataset.getDatasetName());
+ String indexName = primaryIndex.getIndexName();
+ ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+ .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
+ ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
+
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+ itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+ splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
+ indexName, temp);
+
+ // prepare callback
+ JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ int[] primaryKeyFields = new int[numKeys];
+ for (i = 0; i < numKeys; i++) {
+ primaryKeyFields[i] = i;
+ }
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, context.getBinaryComparatorFactoryProvider());
+ int[] filterFields = DatasetUtils.createFilterFields(dataset);
+ int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+ TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp
+ ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+ : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
+ txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart());
+
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, !temp);
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint = getCardinalityPerPartitionHint(dataset);
+ op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+ appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
+ } else {
+ op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+ fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory,
+ NoOpOperationCallbackFactory.INSTANCE);
+ }
+ return new Pair<>(op, splitsAndConstraint.second);
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
+ }
+ }
+
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteOrUpsertRuntime(
+ IndexOperation indexOp, IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec, boolean bulkload, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKey) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
+ Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
Index secondaryIndex;
try {
secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -2301,44 +1429,374 @@
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
- AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
+
ArrayList<LogicalVariable> prevAdditionalFilteringKeys = null;
- if (prevAdditionalFilteringKey != null) {
+ if (indexOp == IndexOperation.UPSERT && prevAdditionalFilteringKey != null) {
prevAdditionalFilteringKeys = new ArrayList<>();
prevAdditionalFilteringKeys.add(prevAdditionalFilteringKey);
}
+ AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
switch (secondaryIndex.getIndexType()) {
- case BTREE: {
- return getBTreeUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
- primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
- prevSecondaryKeys, prevAdditionalFilteringKeys);
- }
- case RTREE: {
- return getRTreeUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
- primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
- prevSecondaryKeys, prevAdditionalFilteringKeys);
- }
+ case BTREE:
+ return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
+ secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
+ bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
+ case RTREE:
+ return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
+ secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
+ bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
case SINGLE_PARTITION_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
- case LENGTH_PARTITIONED_NGRAM_INVIX: {
- return getInvertedIndexUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema,
- primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
- secondaryIndex.getIndexType(), prevSecondaryKeys, prevAdditionalFilteringKeys);
- }
- default: {
+ case LENGTH_PARTITIONED_NGRAM_INVIX:
+ return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
+ secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
+ secondaryIndex.getIndexType(), bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
+ default:
throw new AlgebricksException(
- "upsert is not implemented for index type: " + secondaryIndex.getIndexType());
- }
+ indexOp.name() + "Insert, upsert, and delete not implemented for index type: "
+ + secondaryIndex.getIndexType());
}
}
- //TODO: refactor this method
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexUpsertRuntime(String dataverseName,
- String datasetName, String indexName, IOperatorSchema propagatedSchema,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexType indexType,
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeRuntime(String dataverseName,
+ String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+ AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload, List<LogicalVariable> prevSecondaryKeys,
+ List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
+ Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+ int numKeys = primaryKeys.size() + secondaryKeys.size();
+ int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+
+ // generate field permutations
+ int[] fieldPermutation = new int[numKeys + numFilterFields];
+ int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
+ int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+ int i = 0;
+ int j = 0;
+ for (LogicalVariable varKey : secondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ bloomFilterKeyFields[i] = i;
+ i++;
+ }
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ modificationCallbackPrimaryKeyFields[j] = i;
+ i++;
+ j++;
+ }
+
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
+ fieldPermutation[numKeys] = idx;
+ }
+
+ int[] prevFieldPermutation = null;
+ if (indexOp == IndexOperation.UPSERT) {
+ // generate field permutations for prev record
+ prevFieldPermutation = new int[numKeys + numFilterFields];
+ int k = 0;
+ for (LogicalVariable varKey : prevSecondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ prevFieldPermutation[k] = idx;
+ k++;
+ }
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ prevFieldPermutation[k] = idx;
+ k++;
+ }
+ // Filter can only be one field!
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+ prevFieldPermutation[numKeys] = idx;
+ }
+ }
+
+ String itemTypeName = dataset.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
+ .getDatatype();
+ validateRecordType(itemType);
+ ARecordType recType = (ARecordType) itemType;
+
+ // Index parameters.
+ Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ recType, context.getBinaryComparatorFactoryProvider());
+ int[] filterFields = null;
+ int[] btreeFields = null;
+ if (filterTypeTraits != null) {
+ filterFields = new int[1];
+ filterFields[0] = numKeys;
+ btreeFields = new int[numKeys];
+ for (int k = 0; k < btreeFields.length; k++) {
+ btreeFields[k] = k;
+ }
+ }
+
+ List<List<String>> secondaryKeyNames = secondaryIndex.getKeyFieldNames();
+ List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
+ ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+ for (i = 0; i < secondaryKeys.size(); ++i) {
+ Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
+ secondaryKeyNames.get(i), recType);
+ IAType keyType = keyPairType.first;
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+ List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+ for (List<String> partitioningKey : partitioningKeys) {
+ IAType keyType = recType.getSubFieldType(partitioningKey);
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
+
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+ splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+
+ // prepare callback
+ JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp
+ ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+ : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
+ dataset.hasMetaPart());
+
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, !temp);
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint = getCardinalityPerPartitionHint(dataset);
+ op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+ appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
+ } else if (indexOp == IndexOperation.UPSERT) {
+ op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+ fieldPermutation, idfh, filterFactory, false, indexName, null, modificationCallbackFactory,
+ NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
+ } else {
+ op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+ fieldPermutation, indexOp,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
+ compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
+ filterCmpFactories, btreeFields, filterFields, !temp),
+ filterFactory, false, indexName, null, modificationCallbackFactory,
+ NoOpOperationCallbackFactory.INSTANCE);
+ }
+ return new Pair<>(op, splitsAndConstraint.second);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeRuntime(String dataverseName,
+ String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+ AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload, List<LogicalVariable> prevSecondaryKeys,
+ List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
+
+ try {
+ Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+ String itemTypeName = dataset.getItemTypeName();
+ IAType itemType = MetadataManager.INSTANCE
+ .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
+ validateRecordType(itemType);
+ ARecordType recType = (ARecordType) itemType;
+ Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+ List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
+ List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
+ Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+ secondaryKeyExprs.get(0), recType);
+ IAType spatialType = keyPairType.first;
+ boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
+ || spatialType.getTypeTag() == ATypeTag.POINT3D;
+ int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+ int numSecondaryKeys = dimension * 2;
+ int numPrimaryKeys = primaryKeys.size();
+ int numKeys = numSecondaryKeys + numPrimaryKeys;
+ ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
+
+ int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+ int[] fieldPermutation = new int[numKeys + numFilterFields];
+ int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+ int i = 0;
+ int j = 0;
+
+ for (LogicalVariable varKey : secondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ modificationCallbackPrimaryKeyFields[j] = i;
+ i++;
+ j++;
+ }
+
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
+ fieldPermutation[numKeys] = idx;
+ }
+
+ int[] prevFieldPermutation = null;
+ if (indexOp == IndexOperation.UPSERT) {
+ // Get field permutation for previous value
+ prevFieldPermutation = new int[numKeys + numFilterFields];
+ i = 0;
+
+ // Get field permutation for new value
+ for (LogicalVariable varKey : prevSecondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ prevFieldPermutation[i] = idx;
+ i++;
+ }
+ for (int k = 0; k < numPrimaryKeys; k++) {
+ prevFieldPermutation[k + i] = fieldPermutation[k + i];
+ i++;
+ }
+
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+ prevFieldPermutation[numKeys] = idx;
+ }
+ }
+
+ IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+ IPrimitiveValueProviderFactory[] valueProviderFactories =
+ new IPrimitiveValueProviderFactory[numSecondaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++) {
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(nestedKeyType, true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+ valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+ }
+ List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+ for (List<String> partitioningKey : partitioningKeys) {
+ IAType keyType = recType.getSubFieldType(partitioningKey);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
+ ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
+ IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+ dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+ splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+ int[] btreeFields = new int[primaryComparatorFactories.length];
+ for (int k = 0; k < btreeFields.length; k++) {
+ btreeFields[k] = k + numSecondaryKeys;
+ }
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ recType, context.getBinaryComparatorFactoryProvider());
+ int[] filterFields = null;
+ int[] rtreeFields = null;
+ if (filterTypeTraits != null) {
+ filterFields = new int[1];
+ filterFields[0] = numSecondaryKeys + numPrimaryKeys;
+ rtreeFields = new int[numSecondaryKeys + numPrimaryKeys];
+ for (int k = 0; k < rtreeFields.length; k++) {
+ rtreeFields[k] = k;
+ }
+ }
+
+ // prepare callback
+ JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp
+ ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
+ : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
+ dataset.hasMetaPart());
+
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
+
+ IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
+ primaryComparatorFactories);
+ IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
+ valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+ compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
+ AqlMetadataProvider.proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
+ rtreeFields, filterTypeTraits, filterCmpFactories, filterFields, !temp, isPointMBR);
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint = getCardinalityPerPartitionHint(dataset);
+ op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+ appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+ primaryComparatorFactories, btreeFields, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idff);
+ } else if (indexOp == IndexOperation.UPSERT) {
+ op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, idff,
+ filterFactory, false, indexName, null, modificationCallbackFactory,
+ NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
+ } else {
+ op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, indexOp,
+ idff, filterFactory, false, indexName, null, modificationCallbackFactory,
+ NoOpOperationCallbackFactory.INSTANCE);
+ }
+ return new Pair<>(op, splitsAndConstraint.second);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexRuntime(String dataverseName,
+ String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+ AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload,
List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
throws AlgebricksException {
// Check the index is length-partitioned or not.
@@ -2362,10 +1820,7 @@
throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
}
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
- }
+ Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
boolean temp = dataset.getDatasetDetails().isTemp();
isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
@@ -2397,42 +1852,41 @@
j++;
}
if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
+ int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
fieldPermutation[numKeys] = idx;
}
- // Find permutations for prev value
- int[] prevFieldPermutation = new int[numKeys + numFilterFields];
- i = 0;
+ int[] prevFieldPermutation = null;
+ if (indexOp == IndexOperation.UPSERT) {
+ // Find permutations for prev value
+ prevFieldPermutation = new int[numKeys + numFilterFields];
+ i = 0;
- // If the index is partitioned: [token, number of token]
- // Otherwise: [token]
- for (LogicalVariable varKey : prevSecondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- prevFieldPermutation[i] = idx;
- i++;
- }
+ // If the index is partitioned: [token, number of token]
+ // Otherwise: [token]
+ for (LogicalVariable varKey : prevSecondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ prevFieldPermutation[i] = idx;
+ i++;
+ }
- for (int k = 0; k < primaryKeys.size(); k++) {
- prevFieldPermutation[k + i] = fieldPermutation[k + i];
- i++;
- }
+ for (int k = 0; k < primaryKeys.size(); k++) {
+ prevFieldPermutation[k + i] = fieldPermutation[k + i];
+ i++;
+ }
- if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
- prevFieldPermutation[numKeys] = idx;
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+ prevFieldPermutation[numKeys] = idx;
+ }
}
String itemTypeName = dataset.getItemTypeName();
IAType itemType;
try {
- itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
+ itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
.getDatatype();
-
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
-
+ validateRecordType(itemType);
ARecordType recType = (ARecordType) itemType;
// Index parameters.
@@ -2459,8 +1913,7 @@
IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
- IAType secondaryKeyType = null;
-
+ IAType secondaryKeyType;
Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
secondaryKeyExprs.get(0), recType);
secondaryKeyType = keyPairType.first;
@@ -2511,8 +1964,7 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
+ splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2520,10 +1972,10 @@
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
IModificationOperationCallbackFactory modificationCallbackFactory = temp
? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
ResourceType.LSM_INVERTED_INDEX)
: new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
@@ -2548,208 +2000,102 @@
filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
invertedIndexFieldsForNonBulkLoadOps, !temp);
}
- IOperatorDescriptor op = new AsterixLSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), splitsAndConstraint.first,
- appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation,
- indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName, prevFieldPermutation);
-
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint = getCardinalityPerPartitionHint(dataset);
+ op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, false,
+ numElementsHint, false, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+ appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+ invListsTypeTraits, invListComparatorFactories, tokenizerFactory, indexDataFlowFactory);
+ } else if (indexOp == IndexOperation.UPSERT) {
+ op = new AsterixLSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+ appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+ invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation,
+ indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName,
+ prevFieldPermutation);
+ } else {
+ op = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+ appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+ invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
+ indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName);
+ }
+ return new Pair<>(op, splitsAndConstraint.second);
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
- //TODO: refactor this method
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeUpsertRuntime(String dataverseName,
- String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
- List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
- throws AlgebricksException {
- try {
- Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ // Get a Tokenizer for the bulk-loading data into a n-gram or keyword index.
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBinaryTokenizerRuntime(String dataverseName,
+ String datasetName, String indexName, IOperatorSchema inputSchema, IOperatorSchema propagatedSchema,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+ JobSpecification spec, IndexType indexType) throws AlgebricksException {
- boolean temp = dataset.getDatasetDetails().isTemp();
- isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+ // Sanity checks.
+ if (primaryKeys.size() > 1) {
+ throw new AlgebricksException("Cannot tokenize composite primary key.");
+ }
+ if (secondaryKeys.size() > 1) {
+ throw new AlgebricksException("Cannot tokenize composite secondary key fields.");
+ }
- String itemTypeName = dataset.getItemTypeName();
- IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, itemTypeName).getDatatype();
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
- ARecordType recType = (ARecordType) itemType;
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
+ boolean isPartitioned;
+ if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+ || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+ isPartitioned = true;
+ } else {
+ isPartitioned = false;
+ }
- List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
- List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
- secondaryKeyExprs.get(0), recType);
- IAType spatialType = keyPairType.first;
+ // Number of Keys that needs to be propagated
+ int numKeys = inputSchema.getSize();
- boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
- || spatialType.getTypeTag() == ATypeTag.POINT3D;
- int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
- int numSecondaryKeys = dimension * 2;
- int numPrimaryKeys = primaryKeys.size();
- int numKeys = numSecondaryKeys + numPrimaryKeys;
- ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
-
- int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
- int[] fieldPermutation = new int[numKeys + numFilterFields];
- int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
- int i = 0;
- int j = 0;
-
- // Get field permutation for new value
- for (LogicalVariable varKey : secondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- modificationCallbackPrimaryKeyFields[j] = i;
- i++;
- j++;
- }
-
- if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
- fieldPermutation[numKeys] = idx;
- }
-
- // Get field permutation for previous value
- int[] prevFieldPermutation = new int[numKeys + numFilterFields];
- i = 0;
-
- // Get field permutation for new value
- for (LogicalVariable varKey : prevSecondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- prevFieldPermutation[i] = idx;
- i++;
- }
- for (int k = 0; k < numPrimaryKeys; k++) {
- prevFieldPermutation[k + i] = fieldPermutation[k + i];
- i++;
- }
-
- if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
- prevFieldPermutation[numKeys] = idx;
- }
-
- IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
- IPrimitiveValueProviderFactory[] valueProviderFactories =
- new IPrimitiveValueProviderFactory[numSecondaryKeys];
- for (i = 0; i < numSecondaryKeys; i++) {
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(nestedKeyType, true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
- valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
- }
- List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
- for (List<String> partitioningKey : partitioningKeys) {
- IAType keyType = recType.getSubFieldType(partitioningKey);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
-
- ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
- IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
- dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
- int[] btreeFields = new int[primaryComparatorFactories.length];
- for (int k = 0; k < btreeFields.length; k++) {
- btreeFields[k] = k + numSecondaryKeys;
- }
-
- ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
- recType, context.getBinaryComparatorFactoryProvider());
- int[] filterFields = null;
- int[] rtreeFields = null;
- if (filterTypeTraits != null) {
- filterFields = new int[1];
- filterFields[0] = numSecondaryKeys + numPrimaryKeys;
- rtreeFields = new int[numSecondaryKeys + numPrimaryKeys];
- for (int k = 0; k < rtreeFields.length; k++) {
- rtreeFields[k] = k;
+ // Get the rest of Logical Variables that are not (PK or SK) and each
+ // variable's positions.
+ // These variables will be propagated through TokenizeOperator.
+ List<LogicalVariable> otherKeys = new ArrayList<>();
+ if (inputSchema.getSize() > 0) {
+ for (int k = 0; k < inputSchema.getSize(); k++) {
+ boolean found = false;
+ for (LogicalVariable varKey : primaryKeys) {
+ if (varKey.equals(inputSchema.getVariable(k))) {
+ found = true;
+ break;
+ } else {
+ found = false;
+ }
+ }
+ if (!found) {
+ for (LogicalVariable varKey : secondaryKeys) {
+ if (varKey.equals(inputSchema.getVariable(k))) {
+ found = true;
+ break;
+ } else {
+ found = false;
+ }
+ }
+ }
+ if (!found) {
+ otherKeys.add(inputSchema.getVariable(k));
}
}
-
- // prepare callback
- JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
- ResourceType.LSM_RTREE)
- : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
- ResourceType.LSM_RTREE, dataset.hasMetaPart());
-
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
- .getMergePolicyFactory(dataset, mdTxnCtx);
- IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
- primaryComparatorFactories);
- IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
- valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
- new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
- compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
- AqlMetadataProvider.proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
- rtreeFields, filterTypeTraits, filterCmpFactories, filterFields, !temp, isPointMBR);
- AsterixLSMTreeUpsertOperatorDescriptor op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, idff,
- filterFactory, false, indexName, null, modificationCallbackFactory,
- NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
}
- }
- //TODO: refactor this method
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeUpsertRuntime(String dataverseName,
- String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
- List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
- throws AlgebricksException {
- // we start with the btree
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
- }
- boolean temp = dataset.getDatasetDetails().isTemp();
- isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+ // For tokenization, sorting and loading.
+ // One token (+ optional partitioning field) + primary keys + secondary
+ // keys + other variables
+ // secondary keys and other variables will be just passed to the
+ // IndexInsertDelete Operator.
+ int numTokenKeyPairFields = (!isPartitioned) ? 1 + numKeys : 2 + numKeys;
- int numKeys = primaryKeys.size() + secondaryKeys.size();
- int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+ // generate field permutations for the input
+ int[] fieldPermutation = new int[numKeys];
- // generate field permutations
- int[] fieldPermutation = new int[numKeys + numFilterFields];
- int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
int i = 0;
int j = 0;
- for (LogicalVariable varKey : secondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- bloomFilterKeyFields[i] = i;
- i++;
- }
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
@@ -2757,39 +2103,26 @@
i++;
j++;
}
- // Filter can only be one field!
- if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
- fieldPermutation[numKeys] = idx;
+ for (LogicalVariable varKey : otherKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ for (LogicalVariable varKey : secondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
}
- // generate field permutations for prev record
- int[] prevFieldPermutation = new int[numKeys + numFilterFields];
- int k = 0;
- for (LogicalVariable varKey : prevSecondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- prevFieldPermutation[k] = idx;
- k++;
- }
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- prevFieldPermutation[k] = idx;
- k++;
- }
- // Filter can only be one field!
- if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
- prevFieldPermutation[numKeys] = idx;
- }
-
+ Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
String itemTypeName = dataset.getItemTypeName();
IAType itemType;
try {
- itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
+ itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
.getDatatype();
if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
+ throw new AlgebricksException("Only record types can be tokenized.");
}
ARecordType recType = (ARecordType) itemType;
@@ -2798,74 +2131,118 @@
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
- ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
- recType, context.getBinaryComparatorFactoryProvider());
- int[] filterFields = null;
- int[] btreeFields = null;
- if (filterTypeTraits != null) {
- filterFields = new int[1];
- filterFields[0] = numKeys;
- btreeFields = new int[numKeys];
- for (int l = 0; l < btreeFields.length; l++) {
- btreeFields[l] = l;
- }
- }
+ List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
+ List<IAType> secondaryKeyTypeEntries = secondaryIndex.getKeyFieldTypes();
- List<List<String>> secondaryKeyNames = secondaryIndex.getKeyFieldNames();
- List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
- ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
- for (i = 0; i < secondaryKeys.size(); ++i) {
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
- secondaryKeyNames.get(i), recType);
- IAType keyType = keyPairType.first;
- comparatorFactories[i] =
- AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
+ int numTokenFields = (!isPartitioned) ? secondaryKeys.size() : secondaryKeys.size() + 1;
+ ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
+ ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
+
+ // Find the key type of the secondary key. If it's a derived type,
+ // return the derived type.
+ // e.g. UNORDERED LIST -> return UNORDERED LIST type
+ IAType secondaryKeyType;
+ Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0),
+ secondaryKeyExprs.get(0), recType);
+ secondaryKeyType = keyPairType.first;
List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+ i = 0;
for (List<String> partitioningKey : partitioningKeys) {
IAType keyType = recType.getSubFieldType(partitioningKey);
- comparatorFactories[i] =
- AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ invListsTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+ if (isPartitioned) {
+ // The partitioning field is hardcoded to be a short *without*
+ // an Asterix type tag.
+ tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
+ }
+
+ IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
+ secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
+
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
+ splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName,
+ dataset.getDatasetDetails().isTemp());
- // prepare callback
- JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
- ResourceType.LSM_BTREE)
- : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
- ResourceType.LSM_BTREE, dataset.hasMetaPart());
+ // Generate Output Record format
+ ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
+ ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
+ ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
- .getMergePolicyFactory(dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
- btreeFields, filterFields, !temp);
- AsterixLSMTreeUpsertOperatorDescriptor op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
- idfh, filterFactory, false, indexName, null, modificationCallbackFactory,
- NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
- return new Pair<>(op, splitsAndConstraint.second);
+ // The order of the output record: propagated variables (including
+ // PK and SK), token, and number of token.
+ // #1. propagate all input variables
+ for (int k = 0; k < recordDesc.getFieldCount(); k++) {
+ tokenKeyPairFields[k] = recordDesc.getFields()[k];
+ tokenKeyPairTypeTraits[k] = recordDesc.getTypeTraits()[k];
+ }
+ int tokenOffset = recordDesc.getFieldCount();
+
+ // #2. Specify the token type
+ tokenKeyPairFields[tokenOffset] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
+ tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[0];
+ tokenOffset++;
+
+ // #3. Specify the length-partitioning key: number of token
+ if (isPartitioned) {
+ tokenKeyPairFields[tokenOffset] = ShortSerializerDeserializer.INSTANCE;
+ tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[1];
+ }
+
+ RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
+ IOperatorDescriptor tokenizerOp;
+
+ // Keys to be tokenized : SK
+ int docField = fieldPermutation[fieldPermutation.length - 1];
+
+ // Keys to be propagated
+ int[] keyFields = new int[numKeys];
+ for (int k = 0; k < keyFields.length; k++) {
+ keyFields[k] = k;
+ }
+
+ tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory, docField,
+ keyFields, isPartitioned, true);
+ return new Pair<>(tokenizerOp, splitsAndConstraint.second);
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
-}
+
+ private IBinaryComparatorFactory[] getMergedComparatorFactories(IBinaryComparatorFactory[] comparatorFactories,
+ IBinaryComparatorFactory[] primaryComparatorFactories) {
+ IBinaryComparatorFactory[] btreeCompFactories;
+ int btreeCompFactoriesCount = comparatorFactories.length + primaryComparatorFactories.length;
+ btreeCompFactories = new IBinaryComparatorFactory[btreeCompFactoriesCount];
+ int i = 0;
+ for (; i < comparatorFactories.length; i++) {
+ btreeCompFactories[i] = comparatorFactories[i];
+ }
+ for (int j = 0; i < btreeCompFactoriesCount; i++, j++) {
+ btreeCompFactories[i] = primaryComparatorFactories[j];
+ }
+ return btreeCompFactories;
+ }
+
+ private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
+ throws AlgebricksException {
+ // No filtering condition.
+ if (filterExpr == null) {
+ return null;
+ }
+ IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+ IScalarEvaluatorFactory filterEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(filterExpr,
+ typeEnv, inputSchemas, context);
+ return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspectorFactory());
+ }
+
+ private void validateRecordType(IAType itemType) throws AlgebricksException {
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Only record types can be indexed.");
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
new file mode 100644
index 0000000..0b22dab
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.DatasourceAdapter;
+import org.apache.asterix.metadata.entities.Datatype;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.metadata.utils.MetadataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class MetadataManagerUtil {
+
+ private MetadataManagerUtil() {
+ throw new AssertionError("This util class should not be initialized.");
+ }
+
+ public static IAType findType(MetadataTransactionContext mdTxnCtx, String dataverse, String typeName)
+ throws AlgebricksException {
+ if (dataverse == null || typeName == null) {
+ return null;
+ }
+ Datatype type;
+ try {
+ type = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverse, typeName);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(
+ "Metadata exception while looking up type '" + typeName + "' in dataverse '" + dataverse + "'", e);
+ }
+ if (type == null) {
+ throw new AlgebricksException("Type name '" + typeName + "' unknown in dataverse '" + dataverse + "'");
+ }
+ return type.getDatatype();
+ }
+
+ public static ARecordType findOutputRecordType(MetadataTransactionContext mdTxnCtx, String dataverse,
+ String outputRecordType) throws AlgebricksException {
+ if (outputRecordType == null) {
+ return null;
+ }
+ if (dataverse == null) {
+ throw new AlgebricksException("Cannot declare output-record-type with no dataverse!");
+ }
+ IAType type = findType(mdTxnCtx, dataverse, outputRecordType);
+ if (!(type instanceof ARecordType)) {
+ throw new AlgebricksException("Type " + outputRecordType + " is not a record type!");
+ }
+ return (ARecordType) type;
+ }
+
+ public static DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName,
+ String adapterName) throws MetadataException {
+ DatasourceAdapter adapter;
+ // search in default namespace (built-in adapter)
+ adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+
+ // search in dataverse (user-defined adapter)
+ if (adapter == null) {
+ adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
+ }
+ return adapter;
+ }
+
+ public static Dataset findDataset(MetadataTransactionContext mdTxnCtx, String dataverse, String dataset)
+ throws AlgebricksException {
+ try {
+ return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, dataset);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ public static Dataset findExistingDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
+ String datasetName) throws AlgebricksException {
+ Dataset dataset = findDataset(mdTxnCtx, dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
+ }
+ return dataset;
+ }
+
+ public static INodeDomain findNodeDomain(MetadataTransactionContext mdTxnCtx, String nodeGroupName)
+ throws AlgebricksException {
+ NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroupName);
+ List<String> partitions = new ArrayList<>();
+ for (String location : nodeGroup.getNodeNames()) {
+ int numPartitions = AsterixClusterProperties.INSTANCE.getNodePartitionsCount(location);
+ for (int i = 0; i < numPartitions; i++) {
+ partitions.add(location);
+ }
+ }
+ return new DefaultNodeGroupDomain(partitions);
+ }
+
+ public static Feed findFeed(MetadataTransactionContext mdTxnCtx, String dataverse, String feedName)
+ throws AlgebricksException {
+ try {
+ return MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, feedName);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ public static FeedPolicyEntity findFeedPolicy(MetadataTransactionContext mdTxnCtx, String dataverse,
+ String policyName) throws AlgebricksException {
+ try {
+ return MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverse, policyName);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ public static List<Index> getDatasetIndexes(MetadataTransactionContext mdTxnCtx, String dataverseName,
+ String datasetName) throws AlgebricksException {
+ try {
+ return MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ public static AqlDataSource findDataSource(MetadataTransactionContext mdTxnCtx, AqlSourceId id)
+ throws AlgebricksException {
+ AqlSourceId aqlId = id;
+ try {
+ return lookupSourceInMetadata(mdTxnCtx, aqlId);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ public static AqlDataSource lookupSourceInMetadata(MetadataTransactionContext mdTxnCtx, AqlSourceId aqlId)
+ throws AlgebricksException {
+ Dataset dataset = findDataset(mdTxnCtx, aqlId.getDataverseName(), aqlId.getDatasourceName());
+ if (dataset == null) {
+ throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
+ }
+ IAType itemType = findType(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+ IAType metaItemType = findType(mdTxnCtx, dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+ INodeDomain domain = findNodeDomain(mdTxnCtx, dataset.getNodeGroupName());
+ byte datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL) ? AqlDataSourceType.EXTERNAL_DATASET
+ : AqlDataSourceType.INTERNAL_DATASET;
+ return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType,
+ dataset.getDatasetDetails(), domain);
+ }
+}
\ No newline at end of file