More cleaning.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@196 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index b87cae3..c0053fb 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -267,41 +267,34 @@
projectionList[projCount++] = i;
}
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> assignSplitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-
AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
-
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- assignSplitsAndConstraint.second);
+ splitProviderAndConstraint.second);
// ---------- END ASSIGN OP
// ---------- START EXTERNAL SORT OP
int[] sortFields = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
+ for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
sortFields[i] = i;
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sorterSplitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-
+ }
ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
physicalOptimizationConfig.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories,
secondaryRecDesc);
-
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
- sorterSplitsAndConstraint.second);
+ splitProviderAndConstraint.second);
// ---------- END EXTERNAL SORT OP
// ---------- START SECONDARY INDEX BULK LOAD
int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
+ for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
fieldPermutation[i] = i;
+ }
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
@@ -318,19 +311,12 @@
// ---------- END SECONDARY INDEX BULK LOAD
- // ---------- START CONNECT THE OPERATORS
-
+ // Connect the operators.
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
-
spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
-
spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
-
spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
-
spec.addRoot(secondaryBulkLoadOp);
-
- // ---------- END CONNECT THE OPERATORS
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
@@ -339,103 +325,35 @@
@SuppressWarnings("unchecked")
public static JobSpecification createRtreeIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
-
JobSpecification spec = new JobSpecification();
-
- String primaryIndexName = createIndexStmt.getDatasetName();
+ String datasetName = createIndexStmt.getDatasetName();
String secondaryIndexName = createIndexStmt.getIndexName();
-
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(primaryIndexName);
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
if (compiledDatasetDecl == null) {
- throw new AsterixException("Could not find dataset " + primaryIndexName);
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
}
ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
- ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
- ISerializerDeserializer payloadSerde = serdeProvider.getSerializerDeserializer(itemType);
-
- if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
- throw new AsterixException("Cannot index an external dataset (" + primaryIndexName + ").");
- }
+ ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(itemType);
int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
- // ---------- START GENERAL BTREE STUFF
-
- IIndexRegistryProvider<IIndex> treeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
- IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
-
- // ---------- END GENERAL BTREE STUFF
-
- // ---------- START KEY PROVIDER OP
-
- // TODO: should actually be empty tuple source
- // build tuple containing low and high search keys
- ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
- DataOutput dos = tb.getDataOutput();
-
- tb.reset();
- try {
- IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
- } catch (HyracksDataException e) {
- throw new AsterixException(e);
- } // dummy field
- tb.addFieldEndOffset();
-
- ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> keyProviderSplitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
- ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
- keyProviderSplitsAndConstraint.second);
-
- // ---------- END KEY PROVIDER OP
-
- // ---------- START PRIMARY INDEX SCAN
-
- ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
- IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
- ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
- int i = 0;
- serdeProvider = metadata.getFormat().getSerdeProvider();
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
- .getPartitioningFunctions(compiledDatasetDecl)) {
- IAType keyType = evalFactoryAndType.third;
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- primaryRecFields[i] = keySerde;
- primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, OrderKind.ASC);
- primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
- primaryRecFields[numPrimaryKeys] = payloadSerde;
- primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-
- ITreeIndexFrameFactory primaryInteriorFrameFactory = AqlMetadataProvider
- .createBTreeNSMInteriorFrameFactory(primaryTypeTraits);
- ITreeIndexFrameFactory primaryLeafFrameFactory = AqlMetadataProvider
- .createBTreeNSMLeafFrameFactory(primaryTypeTraits);
-
- int[] lowKeyFields = null; // -infinity
- int[] highKeyFields = null; // +infinity
- RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields);
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
- BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
- spec, primaryRecDesc, storageManager, treeRegistryProvider,
- primarySplitsAndConstraint.first, primaryTypeTraits,
- primaryComparatorFactories, lowKeyFields, highKeyFields, true,
- true, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
-
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
- primarySplitsAndConstraint.second);
-
- // ---------- END PRIMARY INDEX SCAN
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+
+ // Create dummy key provider for feeding the primary index scan.
+ AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, keyProviderOp, splitProviderAndConstraint.second);
+
+ // Create primary index scan op.
+ BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
+ spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
+ splitProviderAndConstraint.first);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, keyProviderOp, splitProviderAndConstraint.second);
// ---------- START ASSIGN OP
@@ -468,7 +386,7 @@
IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
- for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+ for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(nestedKeyType);
secondaryRecFields[i] = keySerde;
@@ -478,49 +396,49 @@
valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
}
- // fill in serializers and comparators for primary index fields
- for (i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecFields[i];
- secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryTypeTraits[i];
+ // Fill in serializers and comparators for primary index fields.
+ RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
+ IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+ secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
}
RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
int[] outColumns = new int[numNestedSecondaryKeyFields];
int[] projectionList = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
- for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+ for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
outColumns[i] = numPrimaryKeys + i + 1;
}
int projCount = 0;
- for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+ for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
projectionList[projCount++] = numPrimaryKeys + i + 1;
}
- for (i = 0; i < numPrimaryKeys; i++) {
+ for (int i = 0; i < numPrimaryKeys; i++) {
projectionList[projCount++] = i;
}
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> assignSplitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
-
AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- assignSplitsAndConstraint.second);
+ splitProviderAndConstraint.second);
// ---------- END ASSIGN OP
// ---------- START SECONDARY INDEX BULK LOAD
int[] fieldPermutation = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
- for (i = 0; i < numNestedSecondaryKeyFields + numPrimaryKeys; i++)
+ for (int i = 0; i < numNestedSecondaryKeyFields + numPrimaryKeys; i++) {
fieldPermutation[i] = i;
+ }
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, secondaryIndexName);
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
- spec, storageManager, treeRegistryProvider,
+ spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
secondarySplitsAndConstraint.first, secondaryTypeTraits,
secondaryComparatorFactories, fieldPermutation, 0.7f,
new RTreeDataflowHelperFactory(valueProviderFactories),
@@ -531,17 +449,11 @@
// ---------- END SECONDARY INDEX BULK LOAD
- // ---------- START CONNECT THE OPERATORS
-
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
-
- spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, asterixAssignOp, 0);
-
+ // Connect the operators.
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
-
spec.addRoot(secondaryBulkLoadOp);
-
- // ---------- END CONNECT THE OPERATORS
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;