Finished code sharing between job gen of BTree and RTree index creation (InvertedIndex will be cleaned next).
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@211 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index 9f3d508..cb292a2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -1,43 +1,22 @@
package edu.uci.ics.asterix.file;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
-import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
-import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-@SuppressWarnings("rawtypes")
public class SecondaryBTreeCreator extends SecondaryIndexCreator {
protected SecondaryBTreeCreator(PhysicalOptimizationConfig physOptConf) {
@@ -52,88 +31,20 @@
// Create dummy key provider for feeding the primary index scan.
AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
- spec, keyProviderOp, primaryPartitionConstrant);
// Create primary index scan op.
BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
- spec, keyProviderOp, primaryPartitionConstrant);
- // ---------- START ASSIGN OP
-
- List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
- int numSecondaryKeys = secondaryKeyFields.size();
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
- IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
- IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
- + numPrimaryKeys];
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
- .getSerdeProvider();
- for (int i = 0; i < numSecondaryKeys; i++) {
- evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
- numPrimaryKeys);
- IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, OrderKind.ASC);
- secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
- // Fill in serializers and comparators for primary index fields.
- RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
- IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
- for (int i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
- secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
- secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
- }
- RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
- int[] outColumns = new int[numSecondaryKeys];
- int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeys; i++) {
- outColumns[i] = numPrimaryKeys + i + 1;
- }
- int projCount = 0;
- for (int i = 0; i < numSecondaryKeys; i++) {
- projectionList[projCount++] = numPrimaryKeys + i + 1;
- }
- for (int i = 0; i < numPrimaryKeys; i++) {
- projectionList[projCount++] = i;
- }
-
- AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
- AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- primaryPartitionConstrant);
-
- // ---------- END ASSIGN OP
-
+ // Assign op.
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numSecondaryKeys);
+
// Sort by secondary keys.
ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstrant);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
- // ---------- START SECONDARY INDEX BULK LOAD
-
- int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
- fieldPermutation[i] = i;
- }
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
- spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
- secondarySplitsAndConstraint.first, secondaryTypeTraits,
- secondaryComparatorFactories, fieldPermutation, 0.7f,
- new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
- secondarySplitsAndConstraint.second);
-
- // ---------- END SECONDARY INDEX BULK LOAD
+ // Create secondary BTree bulk load op.
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, numSecondaryKeys,
+ new BTreeDataflowHelperFactory(), BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 5d2a71f..e331d10 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -11,6 +11,7 @@
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
@@ -45,6 +46,8 @@
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
@SuppressWarnings("rawtypes")
@@ -59,9 +62,11 @@
protected ARecordType itemType;
protected ISerializerDeserializer payloadSerde;
protected IFileSplitProvider primaryFileSplitProvider;
- protected AlgebricksPartitionConstraint primaryPartitionConstrant;
+ protected AlgebricksPartitionConstraint primaryPartitionConstraint;
protected String secondaryIndexName;
-
+
+ protected IBinaryComparatorFactory[] primaryComparatorFactories;
+ protected RecordDescriptor primaryRecDesc;
protected IBinaryComparatorFactory[] secondaryComparatorFactories;
protected RecordDescriptor secondaryRecDesc;
protected IEvaluatorFactory[] evalFactories;
@@ -106,10 +111,62 @@
payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(itemType);
numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
+ numSecondaryKeys = createIndexStmt.getKeyFields().size();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
primaryFileSplitProvider = splitProviderAndConstraint.first;
- primaryPartitionConstrant = splitProviderAndConstraint.second;
+ primaryPartitionConstraint = splitProviderAndConstraint.second;
+ // Must be called in this order.
+ setPrimaryRecDescAndComparators();
+ setSecondaryRecDescAndComparators(createIndexStmt.getKeyFields());
+ }
+
+ protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
+ ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+ ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+ primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
+ List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+ .getPartitioningFunctions(datasetDecl);
+ int i = 0;
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ primaryRecFields[i] = keySerde;
+ primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ keyType, OrderKind.ASC);
+ primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
+ primaryRecFields[numPrimaryKeys] = payloadSerde;
+ primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+ primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+ }
+
+ protected void setSecondaryRecDescAndComparators(List<String> secondaryKeyFields) throws AlgebricksException, AsterixException {
+ evalFactories = new IEvaluatorFactory[numSecondaryKeys];
+ secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+ ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
+ numPrimaryKeys);
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ secondaryRecFields[i] = keySerde;
+ secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ keyType, OrderKind.ASC);
+ secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+ // Add serializers and comparators for primary index fields.
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+ secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+ secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+ }
+ secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
}
protected AbstractOperatorDescriptor createDummyKeyProviderOp(
@@ -132,111 +189,33 @@
spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
tb.getSize());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
- spec, keyProviderOp, primaryPartitionConstrant);
+ spec, keyProviderOp, primaryPartitionConstraint);
return keyProviderOp;
}
protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
- int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
- ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
- ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
- IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
- List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
- .getPartitioningFunctions(datasetDecl);
- int i = 0;
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
- IAType keyType = evalFactoryAndType.third;
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- primaryRecFields[i] = keySerde;
- primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, OrderKind.ASC);
- primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
- primaryRecFields[numPrimaryKeys] = payloadSerde;
- primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-
// -Infinity
int[] lowKeyFields = null;
// +Infinity
int[] highKeyFields = null;
- RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
-
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
- primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories, lowKeyFields, highKeyFields,
- true, true, new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
- spec, primarySearchOp, primaryPartitionConstrant);
+ primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories, lowKeyFields,
+ highKeyFields, true, true, new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
+ primaryPartitionConstraint);
return primarySearchOp;
}
- protected void setSecondaryIndexFactories(BTreeSearchOperatorDescriptor primaryScanOp,
- List<String> secondaryKeyFields) throws AlgebricksException {
- int numSecondaryKeys = secondaryKeyFields.size();
- evalFactories = new IEvaluatorFactory[numSecondaryKeys];
- secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
- for (int i = 0; i < numSecondaryKeys; i++) {
- evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
- numPrimaryKeys);
- IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, OrderKind.ASC);
- secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
- // Fill in serializers and comparators for primary index fields.
- RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
- IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
- for (int i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
- secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
- secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
- }
- secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
- }
-
protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec,
- BTreeSearchOperatorDescriptor primaryScanOp, List<String> secondaryKeyFields) throws AlgebricksException {
- int numSecondaryKeys = secondaryKeyFields.size();
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
- IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
- IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
- + numPrimaryKeys];
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
- for (int i = 0; i < numSecondaryKeys; i++) {
- evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
- numPrimaryKeys);
- IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, OrderKind.ASC);
- secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
- // Fill in serializers and comparators for primary index fields.
- RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
- IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
- for (int i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
- secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
- secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
- }
- RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
- int[] outColumns = new int[numSecondaryKeys];
- int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeys; i++) {
+ BTreeSearchOperatorDescriptor primaryScanOp, int numSecondaryKeyFields) throws AlgebricksException {
+ int[] outColumns = new int[numSecondaryKeyFields];
+ int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys];
+ for (int i = 0; i < numSecondaryKeyFields; i++) {
outColumns[i] = numPrimaryKeys + i + 1;
}
int projCount = 0;
- for (int i = 0; i < numSecondaryKeys; i++) {
+ for (int i = 0; i < numSecondaryKeyFields; i++) {
projectionList[projCount++] = numPrimaryKeys + i + 1;
}
for (int i = 0; i < numPrimaryKeys; i++) {
@@ -247,7 +226,7 @@
AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- primaryPartitionConstrant);
+ primaryPartitionConstraint);
return asterixAssignOp;
}
@@ -260,7 +239,25 @@
ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
- primaryPartitionConstrant);
+ primaryPartitionConstraint);
return sortOp;
}
+
+ protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
+ int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) throws MetadataException,
+ AlgebricksException {
+ int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
+ for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
+ fieldPermutation[i] = i;
+ }
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
+ TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
+ secondarySplitsAndConstraint.first, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
+ fieldPermutation, fillFactor, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
+ secondarySplitsAndConstraint.second);
+ return treeIndexBulkLoadOp;
+ }
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 93b48a3..6fb8ee0 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -2,8 +2,6 @@
import java.util.List;
-import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
-import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
@@ -15,16 +13,10 @@
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -32,21 +24,65 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
@SuppressWarnings("rawtypes")
public class SecondaryRTreeCreator extends SecondaryIndexCreator {
+ protected IPrimitiveValueProviderFactory[] valueProviderFactories;
+ protected int numNestedSecondaryKeyFields;
+
protected SecondaryRTreeCreator(PhysicalOptimizationConfig physOptConf) {
super(physOptConf);
}
@Override
+ protected void setSecondaryRecDescAndComparators(List<String> secondaryKeyFields) throws AlgebricksException,
+ AsterixException {
+ int numSecondaryKeys = secondaryKeyFields.size();
+ if (numSecondaryKeys != 1) {
+ throw new AsterixException(
+ "Cannot use "
+ + numSecondaryKeys
+ + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+ }
+ IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
+ if (spatialType == null) {
+ throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+ }
+ int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+ numNestedSecondaryKeyFields = numDimensions * 2;
+ evalFactories = metadata.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0), numPrimaryKeys,
+ numDimensions);
+ secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+ valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+ + numNestedSecondaryKeyFields];
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
+ IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
+ for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+ ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(nestedKeyType);
+ secondaryRecFields[i] = keySerde;
+ secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ nestedKeyType, OrderKind.ASC);
+ secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+ valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+ }
+ // Add serializers and comparators for primary index fields.
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+ secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
+ }
+ secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+ }
+
+ @Override
public JobSpecification createJobSpec(CompiledCreateIndexStatement createIndexStmt,
AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
init(createIndexStmt, metadata);
@@ -58,97 +94,13 @@
// Create primary index scan op.
BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
- // ---------- START ASSIGN OP
+ // Assign op.
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numNestedSecondaryKeyFields);
- List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
- int numSecondaryKeys = secondaryKeyFields.size();
-
- if (numSecondaryKeys != 1) {
- throw new AsterixException(
- "Cannot use "
- + numSecondaryKeys
- + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
- }
-
- IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
- if (spatialType == null) {
- throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
- }
-
- int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
- int numNestedSecondaryKeyFields = dimension * 2;
-
- IEvaluatorFactory[] evalFactories = metadata.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
- numPrimaryKeys, dimension);
-
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
- + numNestedSecondaryKeyFields];
- IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
- IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-
- IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
- IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
- for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
- ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(nestedKeyType);
- secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- nestedKeyType, OrderKind.ASC);
- secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
- valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
- }
-
- // Fill in serializers and comparators for primary index fields.
- RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
- for (int i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
- secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
- }
- RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-
- int[] outColumns = new int[numNestedSecondaryKeyFields];
- int[] projectionList = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
- for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
- outColumns[i] = numPrimaryKeys + i + 1;
- }
- int projCount = 0;
- for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
- projectionList[projCount++] = numPrimaryKeys + i + 1;
- }
- for (int i = 0; i < numPrimaryKeys; i++) {
- projectionList[projCount++] = i;
- }
-
- AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
- AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- primaryPartitionConstrant);
-
- // ---------- END ASSIGN OP
-
- // ---------- START SECONDARY INDEX BULK LOAD
-
- int[] fieldPermutation = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
- for (int i = 0; i < numNestedSecondaryKeyFields + numPrimaryKeys; i++) {
- fieldPermutation[i] = i;
- }
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
-
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
- spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
- secondarySplitsAndConstraint.first, secondaryTypeTraits,
- secondaryComparatorFactories, fieldPermutation, 0.7f,
- new RTreeDataflowHelperFactory(valueProviderFactories),
- NoOpOperationCallbackProvider.INSTANCE);
-
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
- secondarySplitsAndConstraint.second);
-
- // ---------- END SECONDARY INDEX BULK LOAD
+ // Create secondary RTree bulk load op.
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec,
+ numNestedSecondaryKeyFields, new RTreeDataflowHelperFactory(valueProviderFactories),
+ BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);