First round of cleaning before actually fixing the issue.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@193 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index c9cb798..b87cae3 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -47,6 +47,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
@@ -120,104 +121,108 @@
return spec;
}
+ public static AbstractOperatorDescriptor createDummyKeyProviderOp(
+ JobSpecification spec) throws AsterixException, AlgebricksException {
+ // Build dummy tuple containing one field with a dummy value inside.
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ try {
+ // Serialize dummy value into a field.
+ IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+ } catch (HyracksDataException e) {
+ throw new AsterixException(e);
+ }
+ // Add dummy field.
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
+ spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+ tb.getSize());
+ return keyProviderOp;
+ }
+
+ public static BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(
+ JobSpecification spec, AqlCompiledMetadataDeclarations metadata,
+ AqlCompiledDatasetDecl compiledDatasetDecl, ARecordType itemType,
+ ISerializerDeserializer payloadSerde,
+ IFileSplitProvider splitProvider) throws AlgebricksException,
+ MetadataException {
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(
+ compiledDatasetDecl).size();
+ ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+ ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+ IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
+ .getSerdeProvider();
+ List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl);
+ int i = 0;
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = serdeProvider
+ .getSerializerDeserializer(keyType);
+ primaryRecFields[i] = keySerde;
+ primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(keyType, OrderKind.ASC);
+ primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE
+ .getTypeTrait(keyType);
+ ++i;
+ }
+ primaryRecFields[numPrimaryKeys] = payloadSerde;
+ primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
+ .getTypeTrait(itemType);
+
+ // -Infinity
+ int[] lowKeyFields = null;
+ // +Infinity
+ int[] highKeyFields = null;
+ RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+
+ BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
+ spec, primaryRecDesc, AsterixStorageManagerInterface.INSTANCE,
+ AsterixTreeRegistryProvider.INSTANCE, splitProvider,
+ primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
+ highKeyFields, true, true, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ return primarySearchOp;
+ }
+
+
@SuppressWarnings("unchecked")
public static JobSpecification createBtreeIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
-
JobSpecification spec = new JobSpecification();
-
String datasetName = createIndexStmt.getDatasetName();
String secondaryIndexName = createIndexStmt.getIndexName();
-
AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
if (compiledDatasetDecl == null) {
throw new AlgebricksException("Unknown dataset " + datasetName);
}
- ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
- ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(itemType);
-
if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
}
-
- AqlCompiledDatasetDecl srcCompiledDatasetDecl = compiledDatasetDecl;
+ ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+ ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(itemType);
int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
- // ---------- START GENERAL BTREE STUFF
- IIndexRegistryProvider<IIndex> treeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
- IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
-
- // ---------- END GENERAL BTREE STUFF
-
- // ---------- START KEY PROVIDER OP
-
- // TODO: should actually be empty tuple source
- // build tuple containing low and high search keys
- ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
- DataOutput dos = tb.getDataOutput();
-
- tb.reset();
- try {
- IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
- } catch (HyracksDataException e) {
- throw new AsterixException(e);
- } // dummy field
- tb.addFieldEndOffset();
-
- ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> keyProviderSplitsAndConstraint = metadata
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-
- ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
- keyProviderSplitsAndConstraint.second);
-
- // ---------- END KEY PROVIDER OP
-
- // ---------- START PRIMARY INDEX SCAN
-
- ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
- IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
- ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
- int i = 0;
- ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
- List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
- .getPartitioningFunctions(srcCompiledDatasetDecl);
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
- IAType keyType = evalFactoryAndType.third;
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- primaryRecFields[i] = keySerde;
- primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, OrderKind.ASC);
- primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
- primaryRecFields[numPrimaryKeys] = payloadSerde;
- primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-
- int[] lowKeyFields = null; // -infinity
- int[] highKeyFields = null; // +infinity
- RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields);
-
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
-
- BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
- spec, primaryRecDesc, storageManager, treeRegistryProvider,
- primarySplitsAndConstraint.first, primaryTypeTraits,
- primaryComparatorFactories, lowKeyFields, highKeyFields, true,
- true, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
-
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
- primarySplitsAndConstraint.second);
-
- // ---------- END PRIMARY INDEX SCAN
-
+
+ // Create dummy key provider for feeding the primary index scan.
+ AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, keyProviderOp, splitProviderAndConstraint.second);
+
+ // Create primary index scan op.
+ BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(
+ spec, metadata, compiledDatasetDecl, itemType, payloadSerde,
+ splitProviderAndConstraint.first);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, keyProviderOp, splitProviderAndConstraint.second);
+
// ---------- START ASSIGN OP
List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
@@ -227,7 +232,9 @@
IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
+ numPrimaryKeys];
ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
- for (i = 0; i < numSecondaryKeys; i++) {
+ ISerializerDeserializerProvider serdeProvider = metadata.getFormat()
+ .getSerdeProvider();
+ for (int i = 0; i < numSecondaryKeys; i++) {
evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
numPrimaryKeys);
IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
@@ -237,24 +244,26 @@
keyType, OrderKind.ASC);
secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
- // fill in serializers and comparators for primary index fields
- for (i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numSecondaryKeys + i] = primaryRecFields[i];
+ // Fill in serializers and comparators for primary index fields.
+ RecordDescriptor primaryRecDesc = primaryScanOp.getRecordDescriptor();
+ IBinaryComparatorFactory[] primaryComparatorFactories = primaryScanOp.getTreeIndexComparatorFactories();
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+ secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
- secondaryTypeTraits[numSecondaryKeys + i] = primaryTypeTraits[i];
}
RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
int[] outColumns = new int[numSecondaryKeys];
int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
- for (i = 0; i < numSecondaryKeys; i++) {
+ for (int i = 0; i < numSecondaryKeys; i++) {
outColumns[i] = numPrimaryKeys + i + 1;
}
int projCount = 0;
- for (i = 0; i < numSecondaryKeys; i++) {
+ for (int i = 0; i < numSecondaryKeys; i++) {
projectionList[projCount++] = numPrimaryKeys + i + 1;
}
- for (i = 0; i < numPrimaryKeys; i++) {
+ for (int i = 0; i < numPrimaryKeys; i++) {
projectionList[projCount++] = i;
}
@@ -273,7 +282,7 @@
// ---------- START EXTERNAL SORT OP
int[] sortFields = new int[numSecondaryKeys + numPrimaryKeys];
- for (i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
+ for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
sortFields[i] = i;
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sorterSplitsAndConstraint = metadata
@@ -291,7 +300,7 @@
// ---------- START SECONDARY INDEX BULK LOAD
int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
- for (i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
+ for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
fieldPermutation[i] = i;
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
@@ -299,7 +308,7 @@
// GlobalConfig.DEFAULT_BTREE_FILL_FACTOR
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
- spec, storageManager, treeRegistryProvider,
+ spec, AsterixStorageManagerInterface.INSTANCE, AsterixTreeRegistryProvider.INSTANCE,
secondarySplitsAndConstraint.first, secondaryTypeTraits,
secondaryComparatorFactories, fieldPermutation, 0.7f,
new BTreeDataflowHelperFactory(),
@@ -311,9 +320,9 @@
// ---------- START CONNECT THE OPERATORS
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, asterixAssignOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);