Added index creator class hierarchy for better code sharing. Still needs some cleaning.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@197 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index c0053fb..39b9a3a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -1,69 +1,25 @@
package edu.uci.ics.asterix.file;
-import java.io.DataOutput;
-import java.util.List;
import java.util.logging.Logger;
import edu.uci.ics.asterix.aql.translator.DdlTranslator.CompiledIndexDropStatement;
-import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryTokenizerFactoryProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
-import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
-import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.utils.Triple;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
public class IndexOperations {
@@ -74,31 +30,9 @@
private static final Logger LOGGER = Logger.getLogger(IndexOperations.class.getName());
public static JobSpecification buildCreateIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
- AqlCompiledMetadataDeclarations datasetDecls) throws AsterixException, AlgebricksException {
-
- switch (createIndexStmt.getIndexType()) {
- case BTREE: {
- return createBtreeIndexJobSpec(createIndexStmt, datasetDecls);
- }
-
- case RTREE: {
- return createRtreeIndexJobSpec(createIndexStmt, datasetDecls);
- }
-
- case KEYWORD: {
- return createKeywordIndexJobSpec(createIndexStmt, datasetDecls);
- }
-
- case QGRAM: {
- // return createQgramIndexJobSpec(createIndexStmt,
- // datasetDecls);
- }
-
- default: {
- throw new AsterixException("Unknown Index Type: " + createIndexStmt.getIndexType());
- }
-
- }
+ AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+ SecondaryIndexCreator secondaryIndexCreator = SecondaryIndexCreator.createIndexCreator(createIndexStmt.getIndexType(), physicalOptimizationConfig);
+ return secondaryIndexCreator.createJobSpec(createIndexStmt, metadata);
}
public static JobSpecification createSecondaryIndexDropJobSpec(CompiledIndexDropStatement deleteStmt,
@@ -121,595 +55,6 @@
return spec;
}
- public static AbstractOperatorDescriptor createDummyKeyProviderOp(
- JobSpecification spec) throws AsterixException, AlgebricksException {
- // Build dummy tuple containing one field with a dummy value inside.
- ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- try {
- // Serialize dummy value into a field.
- IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
- } catch (HyracksDataException e) {
- throw new AsterixException(e);
- }
- // Add dummy field.
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
- spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
- tb.getSize());
- return keyProviderOp;
- }
-
- public static BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(
- JobSpecification spec, AqlCompiledMetadataDeclarations metadata,
- AqlCompiledDatasetDecl compiledDatasetDecl, ARecordType itemType,
- ISerializerDeserializer payloadSerde,
- IFileSplitProvider splitProvider) throws AlgebricksException,
- MetadataException {
- int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(
- compiledDatasetDecl).size();
- ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
- ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
- IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
- .getSerdeProvider();
- List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
- .getPartitioningFunctions(compiledDatasetDecl);
- int i = 0;
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
- IAType keyType = evalFactoryAndType.third;
- ISerializerDeserializer keySerde = serdeProvider
- .getSerializerDeserializer(keyType);
- primaryRecFields[i] = keySerde;
- primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(keyType, OrderKind.ASC);
- primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE
- .getTypeTrait(keyType);
- ++i;
- }
- primaryRecFields[numPrimaryKeys] = payloadSerde;
- primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
- .getTypeTrait(itemType);
-
- // -Infinity
- int[] lowKeyFields = null;
- // +Infinity
- int[] highKeyFields = null;
- RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
-
- BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
- spec, primaryRecDesc, AsterixStorageManagerInterface.INSTANCE,
- AsterixTreeRegistryProvider.INSTANCE, splitProvider,
- primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
- highKeyFields, true, true, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- return primarySearchOp;
- }
-
-
- @SuppressWarnings("unchecked")
- public static JobSpecification createBtreeIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
- AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
- JobSpecification spec = new JobSpecification();
- String datasetName = createIndexStmt.getDatasetName();
- String secondaryIndexName = createIndexStmt.getIndexName();
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
- throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
- }
- ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
- ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(itemType);
- int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-
- // Create dummy key provider for feeding the primary index scan.
- AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
- spec, keyProviderOp, splitProviderAndConstraint.second);
-
- // Create primary index scan op.
- BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
- spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
- splitProviderAndConstraint.first);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
- spec, keyProviderOp, splitProviderAndConstraint.second);
-
- // ---------- START ASSIGN OP
-
- List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
- int numSecondaryKeys = secondaryKeyFields.size();
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
- IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
- IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
- + numPrimaryKeys];
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
- .getSerdeProvider();
- for (int i = 0; i < numSecondaryKeys; i++) {
- evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
- numPrimaryKeys);
- IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, OrderKind.ASC);
- secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
- // Fill in serializers and comparators for primary index fields.
- RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
- IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
- for (int i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
- secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
- secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
- }
- RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
- int[] outColumns = new int[numSecondaryKeys];
- int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeys; i++) {
- outColumns[i] = numPrimaryKeys + i + 1;
- }
- int projCount = 0;
- for (int i = 0; i < numSecondaryKeys; i++) {
- projectionList[projCount++] = numPrimaryKeys + i + 1;
- }
- for (int i = 0; i < numPrimaryKeys; i++) {
- projectionList[projCount++] = i;
- }
-
- AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
- AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- splitProviderAndConstraint.second);
-
- // ---------- END ASSIGN OP
-
- // ---------- START EXTERNAL SORT OP
-
- int[] sortFields = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
- sortFields[i] = i;
- }
- ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
- physicalOptimizationConfig.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories,
- secondaryRecDesc);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
- splitProviderAndConstraint.second);
-
- // ---------- END EXTERNAL SORT OP
-
- // ---------- START SECONDARY INDEX BULK LOAD
-
- int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
- fieldPermutation[i] = i;
- }
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
-
- // GlobalConfig.DEFAULT_BTREE_FILL_FACTOR
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
- spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
- secondarySplitsAndConstraint.first, secondaryTypeTraits,
- secondaryComparatorFactories, fieldPermutation, 0.7f,
- new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
- secondarySplitsAndConstraint.second);
-
- // ---------- END SECONDARY INDEX BULK LOAD
-
- // Connect the operators.
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
- spec.addRoot(secondaryBulkLoadOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
-
- }
-
- @SuppressWarnings("unchecked")
- public static JobSpecification createRtreeIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
- AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
- JobSpecification spec = new JobSpecification();
- String datasetName = createIndexStmt.getDatasetName();
- String secondaryIndexName = createIndexStmt.getIndexName();
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
- throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
- }
- ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
- ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(itemType);
- int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-
- // Create dummy key provider for feeding the primary index scan.
- AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
- spec, keyProviderOp, splitProviderAndConstraint.second);
-
- // Create primary index scan op.
- BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
- spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
- splitProviderAndConstraint.first);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
- spec, keyProviderOp, splitProviderAndConstraint.second);
-
- // ---------- START ASSIGN OP
-
- List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
- int numSecondaryKeys = secondaryKeyFields.size();
-
- if (numSecondaryKeys != 1) {
- throw new AsterixException(
- "Cannot use "
- + numSecondaryKeys
- + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
- }
-
- IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
- if (spatialType == null) {
- throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
- }
-
- int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
- int numNestedSecondaryKeyFields = dimension * 2;
-
- IEvaluatorFactory[] evalFactories = metadata.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
- numPrimaryKeys, dimension);
-
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
- + numNestedSecondaryKeyFields];
- IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
- IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-
- IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
- IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
- for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
- ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(nestedKeyType);
- secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- nestedKeyType, OrderKind.ASC);
- secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
- valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
- }
-
- // Fill in serializers and comparators for primary index fields.
- RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
- IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
- for (int i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
- secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
- }
- RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
- int[] outColumns = new int[numNestedSecondaryKeyFields];
- int[] projectionList = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
- for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
- outColumns[i] = numPrimaryKeys + i + 1;
- }
- int projCount = 0;
- for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
- projectionList[projCount++] = numPrimaryKeys + i + 1;
- }
- for (int i = 0; i < numPrimaryKeys; i++) {
- projectionList[projCount++] = i;
- }
-
- AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
- AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
-
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- splitProviderAndConstraint.second);
-
- // ---------- END ASSIGN OP
-
- // ---------- START SECONDARY INDEX BULK LOAD
-
- int[] fieldPermutation = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
- for (int i = 0; i < numNestedSecondaryKeyFields + numPrimaryKeys; i++) {
- fieldPermutation[i] = i;
- }
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
-
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
- spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
- secondarySplitsAndConstraint.first, secondaryTypeTraits,
- secondaryComparatorFactories, fieldPermutation, 0.7f,
- new RTreeDataflowHelperFactory(valueProviderFactories),
- NoOpOperationCallbackProvider.INSTANCE);
-
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
- secondarySplitsAndConstraint.second);
-
- // ---------- END SECONDARY INDEX BULK LOAD
-
- // Connect the operators.
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
- spec.addRoot(secondaryBulkLoadOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
-
- }
-
- @SuppressWarnings("unchecked")
- public static JobSpecification createKeywordIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
- AqlCompiledMetadataDeclarations datasetDecls) throws AsterixException, AlgebricksException {
-
- JobSpecification spec = new JobSpecification();
-
- String primaryIndexName = createIndexStmt.getDatasetName();
- String secondaryIndexName = createIndexStmt.getIndexName();
-
- AqlCompiledDatasetDecl compiledDatasetDecl = datasetDecls.findDataset(primaryIndexName);
- if (compiledDatasetDecl == null) {
- throw new AsterixException("Could not find dataset " + primaryIndexName);
- }
-
- if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
- throw new AsterixException("Cannot index an external dataset (" + primaryIndexName + ").");
- }
- ARecordType itemType = (ARecordType) datasetDecls.findType(compiledDatasetDecl.getItemTypeName());
- ISerializerDeserializerProvider serdeProvider = datasetDecls.getFormat().getSerdeProvider();
- ISerializerDeserializer payloadSerde = serdeProvider.getSerializerDeserializer(itemType);
-
- int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
-
- // sanity
- if (numPrimaryKeys > 1)
- throw new AsterixException("Cannot create inverted keyword index on dataset with composite primary key.");
-
- // sanity
- IAType fieldsToTokenizeType = AqlCompiledIndexDecl
- .keyFieldType(createIndexStmt.getKeyFields().get(0), itemType);
- for (String fieldName : createIndexStmt.getKeyFields()) {
- IAType nextFieldToTokenizeType = AqlCompiledIndexDecl.keyFieldType(fieldName, itemType);
- if (nextFieldToTokenizeType.getTypeTag() != fieldsToTokenizeType.getTypeTag()) {
- throw new AsterixException(
- "Cannot create inverted keyword index. Fields to tokenize must be of the same type.");
- }
- }
-
- // ---------- START GENERAL BTREE STUFF
-
- IIndexRegistryProvider<IIndex> treeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
- IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
-
- // ---------- END GENERAL BTREE STUFF
-
- // ---------- START KEY PROVIDER OP
-
- // TODO: should actually be empty tuple source
- // build tuple containing low and high search keys
- ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
- DataOutput dos = tb.getDataOutput();
-
- try {
- tb.reset();
- IntegerSerializerDeserializer.INSTANCE.serialize(0, dos); // dummy
- // field
- tb.addFieldEndOffset();
- } catch (HyracksDataException e) {
- throw new AsterixException(e);
- }
-
- ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> keyProviderSplitsAndConstraint = datasetDecls
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
- ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
- keyProviderSplitsAndConstraint.second);
-
- // ---------- END KEY PROVIDER OP
-
- // ---------- START PRIMARY INDEX SCAN
-
- ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
- IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
- ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
- int i = 0;
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
- .getPartitioningFunctions(compiledDatasetDecl)) {
- IAType keyType = evalFactoryAndType.third;
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- primaryRecFields[i] = keySerde;
- primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, OrderKind.ASC);
- primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
- primaryRecFields[numPrimaryKeys] = payloadSerde;
- primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-
- ITreeIndexFrameFactory primaryInteriorFrameFactory = AqlMetadataProvider
- .createBTreeNSMInteriorFrameFactory(primaryTypeTraits);
- ITreeIndexFrameFactory primaryLeafFrameFactory = AqlMetadataProvider
- .createBTreeNSMLeafFrameFactory(primaryTypeTraits);
-
- int[] lowKeyFields = null; // -infinity
- int[] highKeyFields = null; // +infinity
- RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields);
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = datasetDecls
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
- BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- storageManager, treeRegistryProvider, primarySplitsAndConstraint.first, primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
- highKeyFields, true, true, new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
- primarySplitsAndConstraint.second);
-
- // ---------- END PRIMARY INDEX SCAN
-
- // ---------- START ASSIGN OP
-
- List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
- int numSecondaryKeys = secondaryKeyFields.size();
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
- IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
- for (i = 0; i < numSecondaryKeys; i++) {
- evalFactories[i] = datasetDecls.getFormat().getFieldAccessEvaluatorFactory(itemType,
- secondaryKeyFields.get(i), numPrimaryKeys);
- IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- secondaryRecFields[i] = keySerde;
- }
- // fill in serializers and comparators for primary index fields
- for (i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numSecondaryKeys + i] = primaryRecFields[i];
- }
- RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
- int[] outColumns = new int[numSecondaryKeys];
- int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
- for (i = 0; i < numSecondaryKeys; i++) {
- outColumns[i] = numPrimaryKeys + i + 1;
- }
- int projCount = 0;
- for (i = 0; i < numSecondaryKeys; i++) {
- projectionList[projCount++] = numPrimaryKeys + i + 1;
- }
- for (i = 0; i < numPrimaryKeys; i++) {
- projectionList[projCount++] = i;
- }
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> assignSplitsAndConstraint = datasetDecls
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
- AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
- AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- assignSplitsAndConstraint.second);
-
- // ---------- END ASSIGN OP
-
- // ---------- START TOKENIZER OP
-
- int numTokenKeyPairFields = numPrimaryKeys + 1;
-
- ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
- tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(fieldsToTokenizeType);
- for (i = 0; i < numPrimaryKeys; i++)
- tokenKeyPairFields[i + 1] = secondaryRecFields[numSecondaryKeys + i];
- RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields);
-
- int[] fieldsToTokenize = new int[numSecondaryKeys];
- for (i = 0; i < numSecondaryKeys; i++)
- fieldsToTokenize[i] = i;
-
- int[] primaryKeyFields = new int[numPrimaryKeys];
- for (i = 0; i < numPrimaryKeys; i++)
- primaryKeyFields[i] = numSecondaryKeys + i;
-
- IBinaryTokenizerFactory tokenizerFactory = AqlBinaryTokenizerFactoryProvider.INSTANCE
- .getTokenizerFactory(fieldsToTokenizeType);
- BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
- tokenKeyPairRecDesc, tokenizerFactory, fieldsToTokenize, primaryKeyFields);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = datasetDecls
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, secondaryIndexName);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
- secondarySplitsAndConstraint.second);
-
- // ---------- END TOKENIZER OP
-
- // ---------- START EXTERNAL SORT OP
-
- IBinaryComparatorFactory[] tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
- tokenKeyPairComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- fieldsToTokenizeType, OrderKind.ASC);
- for (i = 0; i < numPrimaryKeys; i++)
- tokenKeyPairComparatorFactories[i + 1] = primaryComparatorFactories[i];
-
- int[] sortFields = new int[numTokenKeyPairFields]; // <token, primary
- // key a, primary
- // key b, etc.>
- for (i = 0; i < numTokenKeyPairFields; i++)
- sortFields[i] = i;
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sorterSplitsAndConstraint = datasetDecls
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
- ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
- physicalOptimizationConfig.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories,
- secondaryRecDesc);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
- sorterSplitsAndConstraint.second);
-
- // ---------- END EXTERNAL SORT OP
-
- // ---------- START SECONDARY INDEX BULK LOAD
-
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numTokenKeyPairFields];
- secondaryTypeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(fieldsToTokenizeType);
- for (i = 0; i < numPrimaryKeys; i++)
- secondaryTypeTraits[i + 1] = primaryTypeTraits[i];
-
- int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
- for (i = 0; i < numTokenKeyPairFields; i++)
- fieldPermutation[i] = i;
-
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
- spec, storageManager, treeRegistryProvider,
- secondarySplitsAndConstraint.first, secondaryTypeTraits,
- tokenKeyPairComparatorFactories, fieldPermutation, 0.7f,
- new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
- secondarySplitsAndConstraint.second);
-
- // ---------- END SECONDARY INDEX BULK LOAD
-
- // ---------- START CONNECT THE OPERATORS
-
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
-
- spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, asterixAssignOp, 0);
-
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0);
-
- spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0);
-
- spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
-
- spec.addRoot(secondaryBulkLoadOp);
-
- // ---------- END CONNECT THE OPERATORS
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- }
-
public static void main(String[] args) throws Exception {
String host;
String appName;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
new file mode 100644
index 0000000..0bfb209
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -0,0 +1,183 @@
+package edu.uci.ics.asterix.file;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
+import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+
+@SuppressWarnings("rawtypes")
+public class SecondaryBTreeCreator extends SecondaryIndexCreator {
+
+ protected SecondaryBTreeCreator(PhysicalOptimizationConfig physOptConf) {
+ super(physOptConf);
+ }
+
+ @Override
+ public JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
+ AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+ JobSpecification spec = new JobSpecification();
+ String datasetName = createIndexStmt.getDatasetName();
+ String secondaryIndexName = createIndexStmt.getIndexName();
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
+ }
+ ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+ ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(itemType);
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+
+ // Create dummy key provider for feeding the primary index scan.
+ AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, keyProviderOp, splitProviderAndConstraint.second);
+
+ // Create primary index scan op.
+ BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
+ spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
+ splitProviderAndConstraint.first);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, keyProviderOp, splitProviderAndConstraint.second);
+
+ // ---------- START ASSIGN OP
+
+ List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+ int numSecondaryKeys = secondaryKeyFields.size();
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+ IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
+ IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
+ + numPrimaryKeys];
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+ ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
+ .getSerdeProvider();
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
+ numPrimaryKeys);
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ secondaryRecFields[i] = keySerde;
+ secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ keyType, OrderKind.ASC);
+ secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+ // Fill in serializers and comparators for primary index fields.
+ RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
+ IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+ secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+ secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+ }
+ RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+
+ int[] outColumns = new int[numSecondaryKeys];
+ int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ outColumns[i] = numPrimaryKeys + i + 1;
+ }
+ int projCount = 0;
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ projectionList[projCount++] = numPrimaryKeys + i + 1;
+ }
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ projectionList[projCount++] = i;
+ }
+
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+ new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+ splitProviderAndConstraint.second);
+
+ // ---------- END ASSIGN OP
+
+ // ---------- START EXTERNAL SORT OP
+
+ int[] sortFields = new int[numSecondaryKeys + numPrimaryKeys];
+ for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
+ sortFields[i] = i;
+ }
+ ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+ physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories,
+ secondaryRecDesc);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
+ splitProviderAndConstraint.second);
+
+ // ---------- END EXTERNAL SORT OP
+
+ // ---------- START SECONDARY INDEX BULK LOAD
+
+ int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
+ for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
+ fieldPermutation[i] = i;
+ }
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
+
+ // GlobalConfig.DEFAULT_BTREE_FILL_FACTOR
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
+ spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
+ secondarySplitsAndConstraint.first, secondaryTypeTraits,
+ secondaryComparatorFactories, fieldPermutation, 0.7f,
+ new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
+ secondarySplitsAndConstraint.second);
+
+ // ---------- END SECONDARY INDEX BULK LOAD
+
+ // Connect the operators.
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+ spec.addRoot(secondaryBulkLoadOp);
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
+
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
new file mode 100644
index 0000000..fb17ddc
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -0,0 +1,171 @@
+package edu.uci.ics.asterix.file;
+
+import java.io.DataOutput;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
+import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.utils.Triple;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+
+@SuppressWarnings("rawtypes")
+public abstract class SecondaryIndexCreator {
+ protected final PhysicalOptimizationConfig physOptConf;
+
+ protected int numPrimaryKeys;
+ protected int numSecondaryKeys;
+ protected AqlCompiledMetadataDeclarations metadata;
+ protected String datasetName;
+ protected ARecordType itemType;
+ protected ISerializerDeserializer payloadSerde;
+ protected IFileSplitProvider primaryFileSplitProvider;
+ protected AlgebricksPartitionConstraint primaryPartitionConstrant;
+ protected String secondaryIndexName;
+
+ // Prevent public construction.
+ protected SecondaryIndexCreator(PhysicalOptimizationConfig physOptConf) {
+ this.physOptConf = physOptConf;
+ }
+
+ public static SecondaryIndexCreator createIndexCreator(IndexType indexType, PhysicalOptimizationConfig physOptConf) throws AsterixException {
+ switch (indexType) {
+ case BTREE: {
+ return new SecondaryBTreeCreator(physOptConf);
+ }
+ case RTREE: {
+ return new SecondaryRTreeCreator(physOptConf);
+ }
+ case KEYWORD: {
+ return new SecondaryInvertedIndexCreator(physOptConf);
+ }
+ default: {
+ throw new AsterixException("Unknown Index Type: " + indexType);
+ }
+ }
+ }
+
+ public abstract JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
+ AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException;
+
+ protected void init(CompiledCreateIndexStatement createIndexStmt) throws AsterixException, AlgebricksException {
+ datasetName = createIndexStmt.getDatasetName();
+ secondaryIndexName = createIndexStmt.getIndexName();
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AsterixException("Unknown dataset " + datasetName);
+ }
+ if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
+ }
+ itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+ payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(itemType);
+ numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+ primaryFileSplitProvider = splitProviderAndConstraint.first;
+ primaryPartitionConstrant = splitProviderAndConstraint.second;
+ }
+
+ protected AbstractOperatorDescriptor createDummyKeyProviderOp(
+ JobSpecification spec) throws AsterixException, AlgebricksException {
+ // Build dummy tuple containing one field with a dummy value inside.
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ try {
+ // Serialize dummy value into a field.
+ IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+ } catch (HyracksDataException e) {
+ throw new AsterixException(e);
+ }
+ // Add dummy field.
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
+ spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+ tb.getSize());
+ return keyProviderOp;
+ }
+
+ protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(
+ JobSpecification spec, AqlCompiledMetadataDeclarations metadata,
+ AqlCompiledDatasetDecl compiledDatasetDecl, ARecordType itemType,
+ ISerializerDeserializer payloadSerde,
+ IFileSplitProvider splitProvider) throws AlgebricksException,
+ MetadataException {
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(
+ compiledDatasetDecl).size();
+ ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+ ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+ IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
+ .getSerdeProvider();
+ List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl);
+ int i = 0;
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = serdeProvider
+ .getSerializerDeserializer(keyType);
+ primaryRecFields[i] = keySerde;
+ primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(keyType, OrderKind.ASC);
+ primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE
+ .getTypeTrait(keyType);
+ ++i;
+ }
+ primaryRecFields[numPrimaryKeys] = payloadSerde;
+ primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
+ .getTypeTrait(itemType);
+
+ // -Infinity
+ int[] lowKeyFields = null;
+ // +Infinity
+ int[] highKeyFields = null;
+ RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+
+ BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
+ spec, primaryRecDesc, AsterixStorageManagerInterface.INSTANCE,
+ AsterixTreeRegistryProvider.INSTANCE, splitProvider,
+ primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
+ highKeyFields, true, true, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ return primarySearchOp;
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
new file mode 100644
index 0000000..4eb981d
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -0,0 +1,308 @@
+package edu.uci.ics.asterix.file;
+
+import java.io.DataOutput;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryTokenizerFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
+import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.utils.Triple;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+@SuppressWarnings("rawtypes")
+public class SecondaryInvertedIndexCreator extends SecondaryIndexCreator {
+
+ protected SecondaryInvertedIndexCreator(PhysicalOptimizationConfig physOptConf) {
+ super(physOptConf);
+ }
+
+ @Override
+ public JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
+ AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+ JobSpecification spec = new JobSpecification();
+
+ String primaryIndexName = createIndexStmt.getDatasetName();
+ String secondaryIndexName = createIndexStmt.getIndexName();
+
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(primaryIndexName);
+ if (compiledDatasetDecl == null) {
+ throw new AsterixException("Could not find dataset " + primaryIndexName);
+ }
+
+ if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AsterixException("Cannot index an external dataset (" + primaryIndexName + ").");
+ }
+ ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+ ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
+ ISerializerDeserializer payloadSerde = serdeProvider.getSerializerDeserializer(itemType);
+
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+
+ // sanity
+ if (numPrimaryKeys > 1)
+ throw new AsterixException("Cannot create inverted keyword index on dataset with composite primary key.");
+
+ // sanity
+ IAType fieldsToTokenizeType = AqlCompiledIndexDecl
+ .keyFieldType(createIndexStmt.getKeyFields().get(0), itemType);
+ for (String fieldName : createIndexStmt.getKeyFields()) {
+ IAType nextFieldToTokenizeType = AqlCompiledIndexDecl.keyFieldType(fieldName, itemType);
+ if (nextFieldToTokenizeType.getTypeTag() != fieldsToTokenizeType.getTypeTag()) {
+ throw new AsterixException(
+ "Cannot create inverted keyword index. Fields to tokenize must be of the same type.");
+ }
+ }
+
+ // ---------- START GENERAL BTREE STUFF
+
+ IIndexRegistryProvider<IIndex> treeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+ IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+ // ---------- END GENERAL BTREE STUFF
+
+ // ---------- START KEY PROVIDER OP
+
+ // TODO: should actually be empty tuple source
+ // build tuple containing low and high search keys
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
+ DataOutput dos = tb.getDataOutput();
+
+ try {
+ tb.reset();
+ IntegerSerializerDeserializer.INSTANCE.serialize(0, dos); // dummy
+ // field
+ tb.addFieldEndOffset();
+ } catch (HyracksDataException e) {
+ throw new AsterixException(e);
+ }
+
+ ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> keyProviderSplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
+ keyProviderSplitsAndConstraint.second);
+
+ // ---------- END KEY PROVIDER OP
+
+ // ---------- START PRIMARY INDEX SCAN
+
+ ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+ IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+ int i = 0;
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl)) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ primaryRecFields[i] = keySerde;
+ primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ keyType, OrderKind.ASC);
+ primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
+ primaryRecFields[numPrimaryKeys] = payloadSerde;
+ primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+
+ int[] lowKeyFields = null; // -infinity
+ int[] highKeyFields = null; // +infinity
+ RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+ BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+ storageManager, treeRegistryProvider, primarySplitsAndConstraint.first, primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
+ highKeyFields, true, true, new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
+ primarySplitsAndConstraint.second);
+
+ // ---------- END PRIMARY INDEX SCAN
+
+ // ---------- START ASSIGN OP
+
+ List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+ int numSecondaryKeys = secondaryKeyFields.size();
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+ IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++) {
+ evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType,
+ secondaryKeyFields.get(i), numPrimaryKeys);
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ secondaryRecFields[i] = keySerde;
+ }
+ // fill in serializers and comparators for primary index fields
+ for (i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numSecondaryKeys + i] = primaryRecFields[i];
+ }
+ RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+
+ int[] outColumns = new int[numSecondaryKeys];
+ int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++) {
+ outColumns[i] = numPrimaryKeys + i + 1;
+ }
+ int projCount = 0;
+ for (i = 0; i < numSecondaryKeys; i++) {
+ projectionList[projCount++] = numPrimaryKeys + i + 1;
+ }
+ for (i = 0; i < numPrimaryKeys; i++) {
+ projectionList[projCount++] = i;
+ }
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> assignSplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+ new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+ assignSplitsAndConstraint.second);
+
+ // ---------- END ASSIGN OP
+
+ // ---------- START TOKENIZER OP
+
+ int numTokenKeyPairFields = numPrimaryKeys + 1;
+
+ ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
+ tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(fieldsToTokenizeType);
+ for (i = 0; i < numPrimaryKeys; i++)
+ tokenKeyPairFields[i + 1] = secondaryRecFields[numSecondaryKeys + i];
+ RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields);
+
+ int[] fieldsToTokenize = new int[numSecondaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++)
+ fieldsToTokenize[i] = i;
+
+ int[] primaryKeyFields = new int[numPrimaryKeys];
+ for (i = 0; i < numPrimaryKeys; i++)
+ primaryKeyFields[i] = numSecondaryKeys + i;
+
+ IBinaryTokenizerFactory tokenizerFactory = AqlBinaryTokenizerFactoryProvider.INSTANCE
+ .getTokenizerFactory(fieldsToTokenizeType);
+ BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
+ tokenKeyPairRecDesc, tokenizerFactory, fieldsToTokenize, primaryKeyFields);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, secondaryIndexName);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
+ secondarySplitsAndConstraint.second);
+
+ // ---------- END TOKENIZER OP
+
+ // ---------- START EXTERNAL SORT OP
+
+ IBinaryComparatorFactory[] tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
+ tokenKeyPairComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ fieldsToTokenizeType, OrderKind.ASC);
+ for (i = 0; i < numPrimaryKeys; i++)
+ tokenKeyPairComparatorFactories[i + 1] = primaryComparatorFactories[i];
+
+ int[] sortFields = new int[numTokenKeyPairFields]; // <token, primary
+ // key a, primary
+ // key b, etc.>
+ for (i = 0; i < numTokenKeyPairFields; i++)
+ sortFields[i] = i;
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sorterSplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+ ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+ physOptConf.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories,
+ secondaryRecDesc);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
+ sorterSplitsAndConstraint.second);
+
+ // ---------- END EXTERNAL SORT OP
+
+ // ---------- START SECONDARY INDEX BULK LOAD
+
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numTokenKeyPairFields];
+ secondaryTypeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(fieldsToTokenizeType);
+ for (i = 0; i < numPrimaryKeys; i++)
+ secondaryTypeTraits[i + 1] = primaryTypeTraits[i];
+
+ int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
+ for (i = 0; i < numTokenKeyPairFields; i++)
+ fieldPermutation[i] = i;
+
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
+ spec, storageManager, treeRegistryProvider,
+ secondarySplitsAndConstraint.first, secondaryTypeTraits,
+ tokenKeyPairComparatorFactories, fieldPermutation, 0.7f,
+ new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
+ secondarySplitsAndConstraint.second);
+
+ // ---------- END SECONDARY INDEX BULK LOAD
+
+ // ---------- START CONNECT THE OPERATORS
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, asterixAssignOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+
+ spec.addRoot(secondaryBulkLoadOp);
+
+ // ---------- END CONNECT THE OPERATORS
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
new file mode 100644
index 0000000..396616d
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -0,0 +1,188 @@
+package edu.uci.ics.asterix.file;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
+
+@SuppressWarnings("rawtypes")
+public class SecondaryRTreeCreator extends SecondaryIndexCreator {
+
+ protected SecondaryRTreeCreator(PhysicalOptimizationConfig physOptConf) {
+ super(physOptConf);
+ }
+
+ @Override
+ public JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
+ AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+ JobSpecification spec = new JobSpecification();
+ String datasetName = createIndexStmt.getDatasetName();
+ String secondaryIndexName = createIndexStmt.getIndexName();
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
+ }
+ ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+ ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(itemType);
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+
+ // Create dummy key provider for feeding the primary index scan.
+ AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, keyProviderOp, splitProviderAndConstraint.second);
+
+ // Create primary index scan op.
+ BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
+ spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
+ splitProviderAndConstraint.first);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, keyProviderOp, splitProviderAndConstraint.second);
+
+ // ---------- START ASSIGN OP
+
+ List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+ int numSecondaryKeys = secondaryKeyFields.size();
+
+ if (numSecondaryKeys != 1) {
+ throw new AsterixException(
+ "Cannot use "
+ + numSecondaryKeys
+ + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+ }
+
+ IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
+ if (spatialType == null) {
+ throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+ }
+
+ int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+ int numNestedSecondaryKeyFields = dimension * 2;
+
+ IEvaluatorFactory[] evalFactories = metadata.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
+ numPrimaryKeys, dimension);
+
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+ + numNestedSecondaryKeyFields];
+ IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+ IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
+ IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
+ for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+ ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(nestedKeyType);
+ secondaryRecFields[i] = keySerde;
+ secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ nestedKeyType, OrderKind.ASC);
+ secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+ valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+ }
+
+ // Fill in serializers and comparators for primary index fields.
+ RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+ secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
+ }
+ RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+
+ int[] outColumns = new int[numNestedSecondaryKeyFields];
+ int[] projectionList = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
+ for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+ outColumns[i] = numPrimaryKeys + i + 1;
+ }
+ int projCount = 0;
+ for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+ projectionList[projCount++] = numPrimaryKeys + i + 1;
+ }
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ projectionList[projCount++] = i;
+ }
+
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+ new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+ splitProviderAndConstraint.second);
+
+ // ---------- END ASSIGN OP
+
+ // ---------- START SECONDARY INDEX BULK LOAD
+
+ int[] fieldPermutation = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
+ for (int i = 0; i < numNestedSecondaryKeyFields + numPrimaryKeys; i++) {
+ fieldPermutation[i] = i;
+ }
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
+
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
+ spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
+ secondarySplitsAndConstraint.first, secondaryTypeTraits,
+ secondaryComparatorFactories, fieldPermutation, 0.7f,
+ new RTreeDataflowHelperFactory(valueProviderFactories),
+ NoOpOperationCallbackProvider.INSTANCE);
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
+ secondarySplitsAndConstraint.second);
+
+ // ---------- END SECONDARY INDEX BULK LOAD
+
+ // Connect the operators.
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
+ spec.addRoot(secondaryBulkLoadOp);
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
+ }
+
+}