Fixed various bugs that prevented an lsm index from being partitioned accross iodevices.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index 3600bea..742de84 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -171,7 +171,7 @@
//prepare a LocalResourceMetadata which will be stored in NC's local resource repository
ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
comparatorFactories, blooFilterKeyFields, true, storageProperties.getMemoryComponentPageSize(),
- storageProperties.getMemoryComponentNumPages());
+ storageProperties.getMemoryComponentNumPages(), fs);
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMBTreeResource);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index c3a2c01..20b66e9 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -40,7 +40,8 @@
//prepare a LocalResourceMetadata which will be stored in NC's local resource repository
ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(
secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, false,
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages());
+ storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
+ secondaryFileSplitProvider.getFileSplits());
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMBTreeResource);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 253df4b..d1666a7 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -76,308 +76,366 @@
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
@SuppressWarnings("rawtypes")
-// TODO: We should eventually have a hierarchy of classes that can create all possible index job specs,
+// TODO: We should eventually have a hierarchy of classes that can create all
+// possible index job specs,
// not just for creation.
public abstract class SecondaryIndexCreator {
- protected final PhysicalOptimizationConfig physOptConf;
+ protected final PhysicalOptimizationConfig physOptConf;
- protected int numPrimaryKeys;
- protected int numSecondaryKeys;
- protected AqlMetadataProvider metadataProvider;
- protected String dataverseName;
- protected String datasetName;
- protected Dataset dataset;
- protected ARecordType itemType;
- protected ISerializerDeserializer payloadSerde;
- protected IFileSplitProvider primaryFileSplitProvider;
- protected AlgebricksPartitionConstraint primaryPartitionConstraint;
- protected IFileSplitProvider secondaryFileSplitProvider;
- protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
- protected String secondaryIndexName;
- protected boolean anySecondaryKeyIsNullable = false;
+ protected int numPrimaryKeys;
+ protected int numSecondaryKeys;
+ protected AqlMetadataProvider metadataProvider;
+ protected String dataverseName;
+ protected String datasetName;
+ protected Dataset dataset;
+ protected ARecordType itemType;
+ protected ISerializerDeserializer payloadSerde;
+ protected IFileSplitProvider primaryFileSplitProvider;
+ protected AlgebricksPartitionConstraint primaryPartitionConstraint;
+ protected IFileSplitProvider secondaryFileSplitProvider;
+ protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
+ protected String secondaryIndexName;
+ protected boolean anySecondaryKeyIsNullable = false;
- protected long numElementsHint;
- protected IBinaryComparatorFactory[] primaryComparatorFactories;
- protected int[] primaryBloomFilterKeyFields;
- protected RecordDescriptor primaryRecDesc;
- protected IBinaryComparatorFactory[] secondaryComparatorFactories;
- protected int[] secondaryBloomFilterKeyFields;
- protected RecordDescriptor secondaryRecDesc;
- protected ICopyEvaluatorFactory[] secondaryFieldAccessEvalFactories;
+ protected long numElementsHint;
+ protected IBinaryComparatorFactory[] primaryComparatorFactories;
+ protected int[] primaryBloomFilterKeyFields;
+ protected RecordDescriptor primaryRecDesc;
+ protected IBinaryComparatorFactory[] secondaryComparatorFactories;
+ protected int[] secondaryBloomFilterKeyFields;
+ protected RecordDescriptor secondaryRecDesc;
+ protected ICopyEvaluatorFactory[] secondaryFieldAccessEvalFactories;
- protected IAsterixPropertiesProvider propertiesProvider;
+ protected IAsterixPropertiesProvider propertiesProvider;
- // Prevent public construction. Should be created via createIndexCreator().
- protected SecondaryIndexCreator(PhysicalOptimizationConfig physOptConf,
- IAsterixPropertiesProvider propertiesProvider) {
- this.physOptConf = physOptConf;
- this.propertiesProvider = propertiesProvider;
- }
+ // Prevent public construction. Should be created via createIndexCreator().
+ protected SecondaryIndexCreator(PhysicalOptimizationConfig physOptConf,
+ IAsterixPropertiesProvider propertiesProvider) {
+ this.physOptConf = physOptConf;
+ this.propertiesProvider = propertiesProvider;
+ }
- public static SecondaryIndexCreator createIndexCreator(CompiledCreateIndexStatement createIndexStmt,
- AqlMetadataProvider metadataProvider, PhysicalOptimizationConfig physOptConf) throws AsterixException,
- AlgebricksException {
- IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
- SecondaryIndexCreator indexCreator = null;
- switch (createIndexStmt.getIndexType()) {
- case BTREE: {
- indexCreator = new SecondaryBTreeCreator(physOptConf, asterixPropertiesProvider);
- break;
- }
- case RTREE: {
- indexCreator = new SecondaryRTreeCreator(physOptConf, asterixPropertiesProvider);
- break;
- }
- case WORD_INVIX:
- case NGRAM_INVIX:
- case FUZZY_WORD_INVIX:
- case FUZZY_NGRAM_INVIX: {
- indexCreator = new SecondaryInvertedIndexCreator(physOptConf, asterixPropertiesProvider);
- break;
- }
- default: {
- throw new AsterixException("Unknown Index Type: " + createIndexStmt.getIndexType());
- }
- }
- indexCreator.init(createIndexStmt, metadataProvider);
- return indexCreator;
- }
+ public static SecondaryIndexCreator createIndexCreator(
+ CompiledCreateIndexStatement createIndexStmt,
+ AqlMetadataProvider metadataProvider,
+ PhysicalOptimizationConfig physOptConf) throws AsterixException,
+ AlgebricksException {
+ IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo
+ .getInstance();
+ SecondaryIndexCreator indexCreator = null;
+ switch (createIndexStmt.getIndexType()) {
+ case BTREE: {
+ indexCreator = new SecondaryBTreeCreator(physOptConf,
+ asterixPropertiesProvider);
+ break;
+ }
+ case RTREE: {
+ indexCreator = new SecondaryRTreeCreator(physOptConf,
+ asterixPropertiesProvider);
+ break;
+ }
+ case WORD_INVIX:
+ case NGRAM_INVIX:
+ case FUZZY_WORD_INVIX:
+ case FUZZY_NGRAM_INVIX: {
+ indexCreator = new SecondaryInvertedIndexCreator(physOptConf,
+ asterixPropertiesProvider);
+ break;
+ }
+ default: {
+ throw new AsterixException("Unknown Index Type: "
+ + createIndexStmt.getIndexType());
+ }
+ }
+ indexCreator.init(createIndexStmt, metadataProvider);
+ return indexCreator;
+ }
- public abstract JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException;
+ public abstract JobSpecification buildCreationJobSpec()
+ throws AsterixException, AlgebricksException;
- public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException;
+ public abstract JobSpecification buildLoadingJobSpec()
+ throws AsterixException, AlgebricksException;
- protected void init(CompiledCreateIndexStatement createIndexStmt, AqlMetadataProvider metadataProvider)
- throws AsterixException, AlgebricksException {
- this.metadataProvider = metadataProvider;
- dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
- : createIndexStmt.getDataverseName();
- datasetName = createIndexStmt.getDatasetName();
- secondaryIndexName = createIndexStmt.getIndexName();
- dataset = metadataProvider.findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AsterixException("Unknown dataset " + datasetName);
- }
- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
- }
- itemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
- payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
- numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
- numSecondaryKeys = createIndexStmt.getKeyFields().size();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName, datasetName);
- primaryFileSplitProvider = primarySplitsAndConstraint.first;
- primaryPartitionConstraint = primarySplitsAndConstraint.second;
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
- secondaryIndexName);
- secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
- secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
- // Must be called in this order.
- setPrimaryRecDescAndComparators();
- setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
+ protected void init(CompiledCreateIndexStatement createIndexStmt,
+ AqlMetadataProvider metadataProvider) throws AsterixException,
+ AlgebricksException {
+ this.metadataProvider = metadataProvider;
+ dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider
+ .getDefaultDataverseName() : createIndexStmt.getDataverseName();
+ datasetName = createIndexStmt.getDatasetName();
+ secondaryIndexName = createIndexStmt.getIndexName();
+ dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AsterixException("Unknown dataset " + datasetName);
+ }
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AsterixException("Cannot index an external dataset ("
+ + datasetName + ").");
+ }
+ itemType = (ARecordType) metadataProvider.findType(
+ dataset.getDataverseName(), dataset.getItemTypeName());
+ payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(itemType);
+ numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+ numSecondaryKeys = createIndexStmt.getKeyFields().size();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+ dataverseName, datasetName, datasetName);
+ primaryFileSplitProvider = primarySplitsAndConstraint.first;
+ primaryPartitionConstraint = primarySplitsAndConstraint.second;
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+ dataverseName, datasetName, secondaryIndexName);
+ secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+ secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+ // Must be called in this order.
+ setPrimaryRecDescAndComparators();
+ setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
- String numElementsHintString = dataset.getHints().get("CARDINALITY");
- if (numElementsHintString == null) {
- numElementsHint = DatasetCardinalityHint.DEFAULT;
- } else {
- numElementsHint = Long.parseLong(dataset.getHints().get("CARDINALITY"));
- }
- }
+ String numElementsHintString = dataset.getHints().get("CARDINALITY");
+ if (numElementsHintString == null) {
+ numElementsHint = DatasetCardinalityHint.DEFAULT;
+ } else {
+ numElementsHint = Long.parseLong(dataset.getHints().get(
+ "CARDINALITY"));
+ }
+ }
- protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
- List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
- int numPrimaryKeys = partitioningKeys.size();
- ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
- ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
- primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
- primaryBloomFilterKeyFields = new int[numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
- for (int i = 0; i < numPrimaryKeys; i++) {
- IAType keyType;
- try {
- keyType = itemType.getFieldType(partitioningKeys.get(i));
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType);
- primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, true);
- primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- primaryBloomFilterKeyFields[i] = i;
- }
- primaryRecFields[numPrimaryKeys] = payloadSerde;
- primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
- primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
- }
+ protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
+ List<String> partitioningKeys = DatasetUtils
+ .getPartitioningKeys(dataset);
+ int numPrimaryKeys = partitioningKeys.size();
+ ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+ ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+ primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ primaryBloomFilterKeyFields = new int[numPrimaryKeys];
+ ISerializerDeserializerProvider serdeProvider = metadataProvider
+ .getFormat().getSerdeProvider();
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ IAType keyType;
+ try {
+ keyType = itemType.getFieldType(partitioningKeys.get(i));
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ primaryRecFields[i] = serdeProvider
+ .getSerializerDeserializer(keyType);
+ primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(keyType, true);
+ primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE
+ .getTypeTrait(keyType);
+ primaryBloomFilterKeyFields[i] = i;
+ }
+ primaryRecFields[numPrimaryKeys] = payloadSerde;
+ primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
+ .getTypeTrait(itemType);
+ primaryRecDesc = new RecordDescriptor(primaryRecFields,
+ primaryTypeTraits);
+ }
- protected void setSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
- AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
- List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
- secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys];
- secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
- secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
- ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
- IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
- .getBinaryComparatorFactoryProvider();
- for (int i = 0; i < numSecondaryKeys; i++) {
- secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
- itemType, secondaryKeyFields.get(i), numPrimaryKeys);
- Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
- IAType keyType = keyTypePair.first;
- anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
- secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
- secondaryBloomFilterKeyFields[i] = i;
- }
- // Add serializers and comparators for primary index fields.
- for (int i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
- secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
- secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
- }
- secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
- }
+ protected void setSecondaryRecDescAndComparators(
+ CompiledCreateIndexStatement createIndexStmt,
+ AqlMetadataProvider metadataProvider) throws AlgebricksException,
+ AsterixException {
+ List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+ secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys];
+ secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
+ + numPrimaryKeys];
+ secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+ + numSecondaryKeys];
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys
+ + numPrimaryKeys];
+ ISerializerDeserializerProvider serdeProvider = metadataProvider
+ .getFormat().getSerdeProvider();
+ ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat()
+ .getTypeTraitProvider();
+ IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider
+ .getFormat().getBinaryComparatorFactoryProvider();
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat()
+ .getFieldAccessEvaluatorFactory(itemType,
+ secondaryKeyFields.get(i), numPrimaryKeys);
+ Pair<IAType, Boolean> keyTypePair = Index
+ .getNonNullableKeyFieldType(secondaryKeyFields.get(i),
+ itemType);
+ IAType keyType = keyTypePair.first;
+ anySecondaryKeyIsNullable = anySecondaryKeyIsNullable
+ || keyTypePair.second;
+ ISerializerDeserializer keySerde = serdeProvider
+ .getSerializerDeserializer(keyType);
+ secondaryRecFields[i] = keySerde;
+ secondaryComparatorFactories[i] = comparatorFactoryProvider
+ .getBinaryComparatorFactory(keyType, true);
+ secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
+ secondaryBloomFilterKeyFields[i] = i;
+ }
+ // Add serializers and comparators for primary index fields.
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc
+ .getFields()[i];
+ secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc
+ .getTypeTraits()[i];
+ secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+ }
+ secondaryRecDesc = new RecordDescriptor(secondaryRecFields,
+ secondaryTypeTraits);
+ }
- protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AsterixException,
- AlgebricksException {
- // Build dummy tuple containing one field with a dummy value inside.
- ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- try {
- // Serialize dummy value into a field.
- IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
- } catch (HyracksDataException e) {
- throw new AsterixException(e);
- }
- // Add dummy field.
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
- primaryPartitionConstraint);
- return keyProviderOp;
- }
+ protected AbstractOperatorDescriptor createDummyKeyProviderOp(
+ JobSpecification spec) throws AsterixException, AlgebricksException {
+ // Build dummy tuple containing one field with a dummy value inside.
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ try {
+ // Serialize dummy value into a field.
+ IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+ } catch (HyracksDataException e) {
+ throw new AsterixException(e);
+ }
+ // Add dummy field.
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
+ spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+ tb.getSize());
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, keyProviderOp, primaryPartitionConstraint);
+ return keyProviderOp;
+ }
- protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
- // -Infinity
- int[] lowKeyFields = null;
- // +Infinity
- int[] highKeyFields = null;
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
- primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
- primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
- new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, storageProperties
- .getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- storageProperties.getBloomFilterFalsePositiveRate()), false,
- NoOpOperationCallbackFactory.INSTANCE);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
- primaryPartitionConstraint);
- return primarySearchOp;
- }
+ protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(
+ JobSpecification spec) throws AlgebricksException {
+ // -Infinity
+ int[] lowKeyFields = null;
+ // +Infinity
+ int[] highKeyFields = null;
+ AsterixStorageProperties storageProperties = propertiesProvider
+ .getStorageProperties();
+ BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(
+ spec, primaryRecDesc,
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ primaryFileSplitProvider, primaryRecDesc.getTypeTraits(),
+ primaryComparatorFactories, primaryBloomFilterKeyFields,
+ lowKeyFields, highKeyFields, true, true,
+ new LSMBTreeDataflowHelperFactory(
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ storageProperties.getMemoryComponentPageSize(),
+ storageProperties.getMemoryComponentNumPages(),
+ storageProperties.getBloomFilterFalsePositiveRate()),
+ false, NoOpOperationCallbackFactory.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, primarySearchOp, primaryPartitionConstraint);
+ return primarySearchOp;
+ }
- protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec,
- BTreeSearchOperatorDescriptor primaryScanOp, int numSecondaryKeyFields) throws AlgebricksException {
- int[] outColumns = new int[numSecondaryKeyFields];
- int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeyFields; i++) {
- outColumns[i] = numPrimaryKeys + i + 1;
- }
- int projCount = 0;
- for (int i = 0; i < numSecondaryKeyFields; i++) {
- projectionList[projCount++] = numPrimaryKeys + i + 1;
- }
- for (int i = 0; i < numPrimaryKeys; i++) {
- projectionList[projCount++] = i;
- }
- IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
- for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
- sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
- secondaryFieldAccessEvalFactories[i]);
- }
- AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
- AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- primaryPartitionConstraint);
- return asterixAssignOp;
- }
+ protected AlgebricksMetaOperatorDescriptor createAssignOp(
+ JobSpecification spec, BTreeSearchOperatorDescriptor primaryScanOp,
+ int numSecondaryKeyFields) throws AlgebricksException {
+ int[] outColumns = new int[numSecondaryKeyFields];
+ int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys];
+ for (int i = 0; i < numSecondaryKeyFields; i++) {
+ outColumns[i] = numPrimaryKeys + i + 1;
+ }
+ int projCount = 0;
+ for (int i = 0; i < numSecondaryKeyFields; i++) {
+ projectionList[projCount++] = numPrimaryKeys + i + 1;
+ }
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ projectionList[projCount++] = i;
+ }
+ IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+ for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+ sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+ secondaryFieldAccessEvalFactories[i]);
+ }
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns,
+ sefs, projectionList);
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(
+ spec, 1, 1, new IPushRuntimeFactory[] { assign },
+ new RecordDescriptor[] { secondaryRecDesc });
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, asterixAssignOp, primaryPartitionConstraint);
+ return asterixAssignOp;
+ }
- protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
- IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
- int[] sortFields = new int[secondaryComparatorFactories.length];
- for (int i = 0; i < secondaryComparatorFactories.length; i++) {
- sortFields[i] = i;
- }
- ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
- physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
- return sortOp;
- }
+ protected ExternalSortOperatorDescriptor createSortOp(
+ JobSpecification spec,
+ IBinaryComparatorFactory[] secondaryComparatorFactories,
+ RecordDescriptor secondaryRecDesc) {
+ int[] sortFields = new int[secondaryComparatorFactories.length];
+ for (int i = 0; i < secondaryComparatorFactories.length; i++) {
+ sortFields[i] = i;
+ }
+ ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(
+ spec, physOptConf.getMaxFramesExternalSort(), sortFields,
+ secondaryComparatorFactories, secondaryRecDesc);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, sortOp, primaryPartitionConstraint);
+ return sortOp;
+ }
- protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
- int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
- throws MetadataException, AlgebricksException {
- int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
- fieldPermutation[i] = i;
- }
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
- secondaryIndexName);
- TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- secondarySplitsAndConstraint.first, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
- secondaryBloomFilterKeyFields, fieldPermutation, fillFactor, false, numElementsHint,
- dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
- secondarySplitsAndConstraint.second);
- return treeIndexBulkLoadOp;
- }
+ protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(
+ JobSpecification spec, int numSecondaryKeyFields,
+ IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
+ throws MetadataException, AlgebricksException {
+ int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
+ for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
+ fieldPermutation[i] = i;
+ }
+ TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(
+ spec, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
+ secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(),
+ secondaryComparatorFactories, secondaryBloomFilterKeyFields,
+ fieldPermutation, fillFactor, false, numElementsHint,
+ dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, treeIndexBulkLoadOp, secondaryPartitionConstraint);
+ return treeIndexBulkLoadOp;
+ }
- public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields)
- throws AlgebricksException {
- ICopyEvaluatorFactory[] andArgsEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeyFields];
- NotDescriptor notDesc = new NotDescriptor();
- IsNullDescriptor isNullDesc = new IsNullDescriptor();
- for (int i = 0; i < numSecondaryKeyFields; i++) {
- // Access column i, and apply 'is not null'.
- ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i);
- ICopyEvaluatorFactory isNullEvalFactory = isNullDesc
- .createEvaluatorFactory(new ICopyEvaluatorFactory[] { columnAccessEvalFactory });
- ICopyEvaluatorFactory notEvalFactory = notDesc
- .createEvaluatorFactory(new ICopyEvaluatorFactory[] { isNullEvalFactory });
- andArgsEvalFactories[i] = notEvalFactory;
- }
- ICopyEvaluatorFactory selectCond = null;
- if (numSecondaryKeyFields > 1) {
- // Create conjunctive condition where all secondary index keys must satisfy 'is not null'.
- AndDescriptor andDesc = new AndDescriptor();
- selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
- } else {
- selectCond = andArgsEvalFactories[0];
- }
- StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(
- new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(selectCond),
- null, AqlBinaryBooleanInspectorImpl.FACTORY);
- AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { secondaryRecDesc });
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixSelectOp,
- primaryPartitionConstraint);
- return asterixSelectOp;
- }
+ public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(
+ JobSpecification spec, int numSecondaryKeyFields)
+ throws AlgebricksException {
+ ICopyEvaluatorFactory[] andArgsEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeyFields];
+ NotDescriptor notDesc = new NotDescriptor();
+ IsNullDescriptor isNullDesc = new IsNullDescriptor();
+ for (int i = 0; i < numSecondaryKeyFields; i++) {
+ // Access column i, and apply 'is not null'.
+ ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(
+ i);
+ ICopyEvaluatorFactory isNullEvalFactory = isNullDesc
+ .createEvaluatorFactory(new ICopyEvaluatorFactory[] { columnAccessEvalFactory });
+ ICopyEvaluatorFactory notEvalFactory = notDesc
+ .createEvaluatorFactory(new ICopyEvaluatorFactory[] { isNullEvalFactory });
+ andArgsEvalFactories[i] = notEvalFactory;
+ }
+ ICopyEvaluatorFactory selectCond = null;
+ if (numSecondaryKeyFields > 1) {
+ // Create conjunctive condition where all secondary index keys must
+ // satisfy 'is not null'.
+ AndDescriptor andDesc = new AndDescriptor();
+ selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
+ } else {
+ selectCond = andArgsEvalFactories[0];
+ }
+ StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(
+ new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+ selectCond), null,
+ AqlBinaryBooleanInspectorImpl.FACTORY);
+ AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(
+ spec, 1, 1, new IPushRuntimeFactory[] { select },
+ new RecordDescriptor[] { secondaryRecDesc });
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
+ spec, asterixSelectOp, primaryPartitionConstraint);
+ return asterixSelectOp;
+ }
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 366e247..b84f322 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -157,7 +157,7 @@
ILocalResourceMetadata localResourceMetadata = new LSMInvertedIndexLocalResourceMetadata(invListsTypeTraits,
primaryComparatorFactories, tokenTypeTraits, tokenComparatorFactories, tokenizerFactory,
storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- isPartitioned);
+ isPartitioned, secondaryFileSplitProvider.getFileSplits());
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMInvertedIndexResource);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 89a59e8..16e65f9 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -65,7 +65,7 @@
secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, primaryComparatorFactories,
valueProviderFactories, RTreePolicyType.RTREE, AqlMetadataProvider.proposeLinearizer(keyType,
secondaryComparatorFactories.length), storageProperties.getMemoryComponentPageSize(),
- storageProperties.getMemoryComponentNumPages());
+ storageProperties.getMemoryComponentNumPages(), secondaryFileSplitProvider.getFileSplits());
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMRTreeResource);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index f16c09a..d8fd7c2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -358,12 +358,14 @@
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
comparatorFactories, bloomFilterKeyFields, index.isPrimaryIndex(),
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages());
+ storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
+ runtimeContext.getMetaDataIODeviceId());
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMBTreeResource);
ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
- localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, file.getFile()
- .getPath(), 0));
+ localResourceRepository.insert(
+ localResourceFactory.createLocalResource(resourceID, file.getFile().getPath(), 0),
+ runtimeContext.getMetaDataIODeviceId());
} else {
resourceID = localResourceRepository.getResourceByName(file.getFile().getPath()).getResourceId();
lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index ef292cd..cde0cd9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -418,7 +418,8 @@
adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
if (adapterFactoryClassname != null) {
} else {
- // adapterName has been provided as a fully qualified classname
+ // adapterName has been provided as a fully qualified
+ // classname
adapterFactoryClassname = adapterName;
}
adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
@@ -768,9 +769,11 @@
numElementsHint = Long.parseLong(dataset.getHints().get("CARDINALITY"));
}
- //TODO
- //figure out the right behavior of the bulkload and then give the right callback
- //(ex. what's the expected behavior when there is an error during bulkload?)
+ // TODO
+ // figure out the right behavior of the bulkload and then give the
+ // right callback
+ // (ex. what's the expected behavior when there is an error during
+ // bulkload?)
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
@@ -829,7 +832,7 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
dataSource.getId().getDataverseName(), datasetName, indexName);
- //prepare callback
+ // prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
int[] primaryKeyFields = new int[numKeys];
@@ -1022,7 +1025,7 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
dataverseName, datasetName, indexName);
- //prepare callback
+ // prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
int[] primaryKeyFields = new int[primaryKeys.size()];
@@ -1137,7 +1140,8 @@
tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
if (isPartitioned) {
- // The partitioning field is hardcoded to be a short *without* an Asterix type tag.
+ // The partitioning field is hardcoded to be a short *without*
+ // an Asterix type tag.
tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
}
@@ -1148,7 +1152,7 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
dataverseName, datasetName, indexName);
- //prepare callback
+ // prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
int[] primaryKeyFields = new int[primaryKeys.size()];
@@ -1243,7 +1247,7 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
dataverseName, datasetName, indexName);
- //prepare callback
+ // prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
int[] primaryKeyFields = new int[numPrimaryKeys];
@@ -1337,8 +1341,13 @@
continue;
}
for (int i = 0; i < nodeStores.length; i++) {
- File f = new File(nodeStores[i] + File.separator + relPathFile);
- splits.add(new FileSplit(node, new FileReference(f)));
+ int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
+ for (int j = 0; j < nodeStores.length; j++) {
+ for (int k = 0; k < numIODevices; k++) {
+ File f = new File(nodeStores[j] + File.separator + relPathFile);
+ splits.add(new FileSplit(node, new FileReference(f), k));
+ }
+ }
}
}
return splits.toArray(new FileSplit[] {});
@@ -1419,21 +1428,6 @@
return dataverseName + File.separator + fileName;
}
- public Pair<IFileSplitProvider, IFileSplitProvider> getInvertedIndexFileSplitProviders(
- IFileSplitProvider splitProvider) {
- int numSplits = splitProvider.getFileSplits().length;
- FileSplit[] btreeSplits = new FileSplit[numSplits];
- FileSplit[] invListsSplits = new FileSplit[numSplits];
- for (int i = 0; i < numSplits; i++) {
- String nodeName = splitProvider.getFileSplits()[i].getNodeName();
- String path = splitProvider.getFileSplits()[i].getLocalFile().getFile().getPath();
- btreeSplits[i] = new FileSplit(nodeName, path + "_$btree");
- invListsSplits[i] = new FileSplit(nodeName, path + "_$invlists");
- }
- return new Pair<IFileSplitProvider, IFileSplitProvider>(new ConstantFileSplitProvider(btreeSplits),
- new ConstantFileSplitProvider(invListsSplits));
- }
-
public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
try {
return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, dataset);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 7ab0d10..c59d854 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -6,6 +6,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
@@ -27,14 +28,27 @@
private final int[] bloomFilterKeyFields;
private final int memPageSize;
private final int memNumPages;
+ private FileSplit[] fileSplits;
+ private int ioDeviceID;
public LSMBTreeLocalResourceMetadata(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
- int[] bloomFilterKeyFields, boolean isPrimary, int memPageSize, int memNumPages) {
+ int[] bloomFilterKeyFields, boolean isPrimary, int memPageSize, int memNumPages, FileSplit[] fileSplits) {
this.typeTraits = typeTraits;
this.cmpFactories = cmpFactories;
this.bloomFilterKeyFields = bloomFilterKeyFields;
this.memPageSize = memPageSize;
this.memNumPages = memNumPages;
+ this.fileSplits = fileSplits;
+ }
+
+ public LSMBTreeLocalResourceMetadata(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
+ int[] bloomFilterKeyFields, boolean isPrimary, int memPageSize, int memNumPages, int ioDeviceID) {
+ this.typeTraits = typeTraits;
+ this.cmpFactories = cmpFactories;
+ this.bloomFilterKeyFields = bloomFilterKeyFields;
+ this.memPageSize = memPageSize;
+ this.memNumPages = memNumPages;
+ this.ioDeviceID = ioDeviceID;
}
@Override
@@ -45,13 +59,13 @@
memNumPages, new TransientFileMapManager());
ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
IInMemoryFreePageManager memFreePageManager = new InMemoryFreePageManager(memNumPages, metaDataFrameFactory);
- LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager,
- runtimeContextProvider.getIOManager(), file, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider.getLSMMergePolicy(),
- runtimeContextProvider.getLSMBTreeOperationTrackerFactory(),
- runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(), partition);
+ LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, runtimeContextProvider
+ .getIOManager(), file, runtimeContextProvider.getBufferCache(), runtimeContextProvider
+ .getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields, runtimeContextProvider
+ .getBloomFilterFalsePositiveRate(), runtimeContextProvider.getLSMMergePolicy(), runtimeContextProvider
+ .getLSMBTreeOperationTrackerFactory(), runtimeContextProvider.getLSMIOScheduler(),
+ runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(), fileSplits == null ? ioDeviceID
+ : fileSplits[partition].getIODeviceId());
return lsmBTree;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index 3bcb747..c655770 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -4,6 +4,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
@@ -28,11 +29,12 @@
private final int memPageSize;
private final int memNumPages;
private final boolean isPartitioned;
+ private final FileSplit[] fileSplits;
public LSMInvertedIndexLocalResourceMetadata(ITypeTraits[] invListTypeTraits,
IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory, int memPageSize,
- int memNumPages, boolean isPartitioned) {
+ int memNumPages, boolean isPartitioned, FileSplit[] fileSplits) {
this.invListTypeTraits = invListTypeTraits;
this.invListCmpFactories = invListCmpFactories;
this.tokenTypeTraits = tokenTypeTraits;
@@ -41,6 +43,7 @@
this.memPageSize = memPageSize;
this.memNumPages = memNumPages;
this.isPartitioned = isPartitioned;
+ this.fileSplits = fileSplits;
}
@Override
@@ -62,7 +65,8 @@
runtimeContextProvider.getLSMMergePolicy(),
runtimeContextProvider.getLSMInvertedIndexOperationTrackerFactory(),
runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(), partition);
+ runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(),
+ fileSplits[partition].getIODeviceId());
} else {
return InvertedIndexUtils.createLSMInvertedIndex(memBufferCache, memFreePageManager,
runtimeContextProvider.getFileMapManager(), invListTypeTraits, invListCmpFactories,
@@ -72,7 +76,8 @@
runtimeContextProvider.getLSMMergePolicy(),
runtimeContextProvider.getLSMInvertedIndexOperationTrackerFactory(),
runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(), partition);
+ runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(),
+ fileSplits[partition].getIODeviceId());
}
} catch (IndexException e) {
throw new HyracksDataException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index ff26c54..6c806c8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -8,6 +8,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
@@ -33,11 +34,12 @@
private final ILinearizeComparatorFactory linearizeCmpFactory;
private final int memPageSize;
private final int memNumPages;
+ private final FileSplit[] fileSplits;
public LSMRTreeLocalResourceMetadata(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] rtreeCmpFactories,
IBinaryComparatorFactory[] btreeCmpFactories, IPrimitiveValueProviderFactory[] valueProviderFactories,
RTreePolicyType rtreePolicyType, ILinearizeComparatorFactory linearizeCmpFactory, int memPageSize,
- int memNumPages) {
+ int memNumPages, FileSplit[] fileSplits) {
this.typeTraits = typeTraits;
this.rtreeCmpFactories = rtreeCmpFactories;
this.btreeCmpFactories = btreeCmpFactories;
@@ -46,6 +48,7 @@
this.linearizeCmpFactory = linearizeCmpFactory;
this.memPageSize = memPageSize;
this.memNumPages = memNumPages;
+ this.fileSplits = fileSplits;
}
@Override
@@ -66,7 +69,8 @@
runtimeContextProvider.getLSMMergePolicy(),
runtimeContextProvider.getLSMRTreeOperationTrackerFactory(),
runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(), linearizeCmpFactory, partition);
+ runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(), linearizeCmpFactory,
+ fileSplits[partition].getIODeviceId());
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 01dce6c..275e42e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -28,8 +28,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
import edu.uci.ics.hyracks.storage.common.file.LocalResource;
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
@@ -37,25 +37,29 @@
public class PersistentLocalResourceRepository implements ILocalResourceRepository {
private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName());
- private final String mountPoint;
+ private final String[] mountPoints;
private static final String ROOT_METADATA_DIRECTORY = "asterix_root_metadata/";
private static final String ROOT_METADATA_FILE_NAME_PREFIX = ".asterix_root_metadata_";
private static final long ROOT_LOCAL_RESOURCE_ID = -4321;
private static final String METADATA_FILE_NAME = ".metadata";
private Map<String, LocalResource> name2ResourceMap = new HashMap<String, LocalResource>();
private Map<Long, LocalResource> id2ResourceMap = new HashMap<Long, LocalResource>();
- private String rootMetadataFileName;
- private String rootDir;
+ private final int numIODevices;
- public PersistentLocalResourceRepository(String mountPoint) throws HyracksDataException {
- File mountPointDir = new File(mountPoint);
- if (!mountPointDir.exists()) {
- throw new HyracksDataException(mountPointDir.getAbsolutePath() + "doesn't exist.");
- }
- if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
- this.mountPoint = new String(mountPoint + System.getProperty("file.separator"));
- } else {
- this.mountPoint = new String(mountPoint);
+ public PersistentLocalResourceRepository(List<IODeviceHandle> devices) throws HyracksDataException {
+ numIODevices = devices.size();
+ this.mountPoints = new String[numIODevices];
+ for (int i = 0; i < numIODevices; i++) {
+ String mountPoint = devices.get(i).getPath().getPath();
+ File mountPointDir = new File(mountPoint);
+ if (!mountPointDir.exists()) {
+ throw new HyracksDataException(mountPointDir.getAbsolutePath() + "doesn't exist.");
+ }
+ if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
+ mountPoints[i] = new String(mountPoint + System.getProperty("file.separator"));
+ } else {
+ mountPoints[i] = new String(mountPoint);
+ }
}
}
@@ -64,53 +68,40 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Initializing local resource repository ... ");
}
- LocalResource rootLocalResource = null;
- //#. if the rootMetadataFile doesn't exist, create it and return.
- rootMetadataFileName = new String(mountPoint + ROOT_METADATA_DIRECTORY + ROOT_METADATA_FILE_NAME_PREFIX
- + nodeId);
- File rootMetadataFile = new File(rootMetadataFileName);
if (isNewUniverse) {
- File rootMetadataDir = new File(mountPoint + ROOT_METADATA_DIRECTORY);
- if (!rootMetadataDir.exists()) {
- rootMetadataDir.mkdir();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("created the root-metadata-file's directory: " + rootMetadataDir.getAbsolutePath());
+ //#. if the rootMetadataFile doesn't exist, create it and return.
+ for (int i = 0; i < numIODevices; i++) {
+ String rootMetadataFileName = new String(mountPoints[i] + ROOT_METADATA_DIRECTORY
+ + ROOT_METADATA_FILE_NAME_PREFIX + nodeId);
+ File rootMetadataFile = new File(rootMetadataFileName);
+
+ File rootMetadataDir = new File(mountPoints[i] + ROOT_METADATA_DIRECTORY);
+ if (!rootMetadataDir.exists()) {
+ rootMetadataDir.mkdir();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("created the root-metadata-file's directory: " + rootMetadataDir.getAbsolutePath());
+ }
}
- }
- rootMetadataFile.delete();
- if (rootDir.startsWith(System.getProperty("file.separator"))) {
- this.rootDir = new String(mountPoint + rootDir.substring(System.getProperty("file.separator").length()));
- } else {
- this.rootDir = new String(mountPoint + rootDir);
- }
- rootLocalResource = new LocalResource(ROOT_LOCAL_RESOURCE_ID, rootMetadataFileName, 0, 0, this.rootDir);
- insert(rootLocalResource);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("created the root-metadata-file: " + rootMetadataFileName);
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Completed the initialization of the local resource repository");
- }
- return;
- }
+ rootMetadataFile.delete();
+ String mountedRootDir;
+ if (rootDir.startsWith(System.getProperty("file.separator"))) {
+ mountedRootDir = new String(mountPoints[i]
+ + rootDir.substring(System.getProperty("file.separator").length()));
+ } else {
+ mountedRootDir = new String(mountPoints[i] + rootDir);
+ }
+ LocalResource rootLocalResource = new LocalResource(ROOT_LOCAL_RESOURCE_ID, rootMetadataFileName, 0, 0,
+ mountedRootDir);
+ insert(rootLocalResource, i);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("created the root-metadata-file: " + rootMetadataFileName);
+ }
- //#. if the rootMetadataFile exists, read it and set this.rootDir.
- rootLocalResource = readLocalResource(rootMetadataFile);
- this.rootDir = (String) rootLocalResource.getResourceObject();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("The root directory of the local resource repository is " + this.rootDir);
- }
-
- //#. load all local resources.
- File rootDirFile = new File(this.rootDir);
- if (!rootDirFile.exists()) {
- //rootDir may not exist if this node is not the metadata node and doesn't have any user data.
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("The root directory of the local resource repository doesn't exist: there is no local resource.");
- LOGGER.info("Completed the initialization of the local resource repository");
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Completed the initialization of the local resource repository");
+ }
}
return;
}
@@ -125,27 +116,50 @@
}
};
- long maxResourceId = 0;
- File[] dataverseFileList = rootDirFile.listFiles();
- if (dataverseFileList == null) {
- throw new HyracksDataException("Metadata dataverse doesn't exist.");
- }
- for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- File[] metadataFiles = indexFile.listFiles(filter);
- if (metadataFiles != null) {
- for (File metadataFile : metadataFiles) {
- LocalResource localResource = readLocalResource(metadataFile);
- id2ResourceMap.put(localResource.getResourceId(), localResource);
- name2ResourceMap.put(localResource.getResourceName(), localResource);
- maxResourceId = Math.max(localResource.getResourceId(), maxResourceId);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("loaded local resource - [id: " + localResource.getResourceId()
- + ", name: " + localResource.getResourceName() + "]");
+ for (int i = 0; i < numIODevices; i++) {
+ String rootMetadataFileName = new String(mountPoints[i] + ROOT_METADATA_DIRECTORY
+ + ROOT_METADATA_FILE_NAME_PREFIX + nodeId);
+ File rootMetadataFile = new File(rootMetadataFileName);
+ //#. if the rootMetadataFile exists, read it and set this.rootDir.
+ LocalResource rootLocalResource = readLocalResource(rootMetadataFile);
+ String mountedRootDir = (String) rootLocalResource.getResourceObject();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("The root directory of the local resource repository is " + mountedRootDir);
+ }
+
+ //#. load all local resources.
+ File rootDirFile = new File(mountedRootDir);
+ if (!rootDirFile.exists()) {
+ //rootDir may not exist if this node is not the metadata node and doesn't have any user data.
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("The root directory of the local resource repository doesn't exist: there is no local resource.");
+ LOGGER.info("Completed the initialization of the local resource repository");
+ }
+ continue;
+ }
+
+ long maxResourceId = 0;
+ File[] dataverseFileList = rootDirFile.listFiles();
+ if (dataverseFileList == null) {
+ throw new HyracksDataException("Metadata dataverse doesn't exist.");
+ }
+ for (File dataverseFile : dataverseFileList) {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ if (indexFile.isDirectory()) {
+ File[] metadataFiles = indexFile.listFiles(filter);
+ if (metadataFiles != null) {
+ for (File metadataFile : metadataFiles) {
+ LocalResource localResource = readLocalResource(metadataFile);
+ id2ResourceMap.put(localResource.getResourceId(), localResource);
+ name2ResourceMap.put(localResource.getResourceName(), localResource);
+ maxResourceId = Math.max(localResource.getResourceId(), maxResourceId);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("loaded local resource - [id: " + localResource.getResourceId()
+ + ", name: " + localResource.getResourceName() + "]");
+ }
}
}
}
@@ -153,11 +167,11 @@
}
}
}
- }
- resourceIdFactory.initId(maxResourceId + 1);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("The resource id factory is intialized with the value: " + (maxResourceId + 1));
- LOGGER.info("Completed the initialization of the local resource repository");
+ resourceIdFactory.initId(maxResourceId + 1);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("The resource id factory is intialized with the value: " + (maxResourceId + 1));
+ LOGGER.info("Completed the initialization of the local resource repository");
+ }
}
}
@@ -172,7 +186,7 @@
}
@Override
- public synchronized void insert(LocalResource resource) throws HyracksDataException {
+ public synchronized void insert(LocalResource resource, int ioDeviceId) throws HyracksDataException {
long id = resource.getResourceId();
if (id2ResourceMap.containsKey(id)) {
@@ -186,8 +200,10 @@
FileOutputStream fos = null;
ObjectOutputStream oosToFos = null;
+
try {
- fos = new FileOutputStream(getFileName(mountPoint, resource.getResourceName(), resource.getResourceId()));
+ fos = new FileOutputStream(getFileName(mountPoints[ioDeviceId], resource.getResourceName(),
+ resource.getResourceId()));
oosToFos = new ObjectOutputStream(fos);
oosToFos.writeObject(resource);
oosToFos.flush();
@@ -212,26 +228,26 @@
}
@Override
- public synchronized void deleteResourceById(long id) throws HyracksDataException {
+ public synchronized void deleteResourceById(long id, int ioDeviceId) throws HyracksDataException {
LocalResource resource = id2ResourceMap.get(id);
if (resource == null) {
throw new HyracksDataException("Resource doesn't exist");
}
id2ResourceMap.remove(id);
name2ResourceMap.remove(resource.getResourceName());
- File file = new File(getFileName(mountPoint, resource.getResourceName(), resource.getResourceId()));
+ File file = new File(getFileName(mountPoints[ioDeviceId], resource.getResourceName(), resource.getResourceId()));
file.delete();
}
@Override
- public synchronized void deleteResourceByName(String name) throws HyracksDataException {
+ public synchronized void deleteResourceByName(String name, int ioDeviceId) throws HyracksDataException {
LocalResource resource = name2ResourceMap.get(name);
if (resource == null) {
throw new HyracksDataException("Resource doesn't exist");
}
id2ResourceMap.remove(resource.getResourceId());
name2ResourceMap.remove(name);
- File file = new File(getFileName(mountPoint, resource.getResourceName(), resource.getResourceId()));
+ File file = new File(getFileName(mountPoints[ioDeviceId], resource.getResourceName(), resource.getResourceId()));
file.delete();
}
@@ -286,4 +302,4 @@
}
}
}
-}
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index c7efca5..f6847f9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -14,11 +14,8 @@
*/
package edu.uci.ics.asterix.transaction.management.resource;
-import java.util.List;
-
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
@@ -31,7 +28,6 @@
@Override
public ILocalResourceRepository createRepository() throws HyracksDataException {
- List<IODeviceHandle> devices = ioManager.getIODevices();
- return new PersistentLocalResourceRepository(devices.get(0).getPath().getPath());
+ return new PersistentLocalResourceRepository(ioManager.getIODevices());
}
-}
+}
\ No newline at end of file