checkpoint towards fixing LSN related issues(issue 591, 609, and 614) and more
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 56fc0e7..af503a6 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -220,17 +220,15 @@
dataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate());
} else {
dataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate());
}
LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index ee9dfae..b528381 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -18,10 +18,8 @@
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -95,26 +93,6 @@
}
@Override
- public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
- }
-
- @Override
- public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary) {
- return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
- }
-
- @Override
- public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
- }
-
- @Override
- public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
- }
-
- @Override
public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
return asterixAppRuntimeContext.getLSMBTreeOperationTracker(datasetID);
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index 3e41a77..ccc5e85 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -30,6 +30,7 @@
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -127,8 +128,7 @@
splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()));
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
splitsAndConstraint.second);
@@ -180,7 +180,7 @@
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
@@ -267,7 +267,7 @@
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
splitsAndConstraint.second);
@@ -293,7 +293,7 @@
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index de4d075..e090c2c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -59,17 +59,16 @@
metadataProvider, physicalOptimizationConfig);
return secondaryIndexCreator.buildLoadingJobSpec();
}
-
- public static void addExternalDatasetFilesToMetadata(AqlMetadataProvider metadataProvider,
- Dataset dataset) throws AlgebricksException, MetadataException{
- //get the file list
- ArrayList<ExternalFile> files = metadataProvider.getExternalDatasetFiles(dataset);
- //add files to metadata
- for(int i=0; i < files.size(); i++)
- {
- MetadataManager.INSTANCE.addExternalFile(metadataProvider.getMetadataTxnContext(), files.get(i));
- }
- }
+
+ public static void addExternalDatasetFilesToMetadata(AqlMetadataProvider metadataProvider, Dataset dataset)
+ throws AlgebricksException, MetadataException {
+ //get the file list
+ ArrayList<ExternalFile> files = metadataProvider.getExternalDatasetFiles(dataset);
+ //add files to metadata
+ for (int i = 0; i < files.size(); i++) {
+ MetadataManager.INSTANCE.addExternalFile(metadataProvider.getMetadataTxnContext(), files.get(i));
+ }
+ }
public static JobSpecification buildDropSecondaryIndexJobSpec(CompiledIndexDropStatement indexDropStmt,
AqlMetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException {
@@ -86,9 +85,8 @@
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
- .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()));
AlgebricksPartitionConstraintHelper
.setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index baf16de..55cccbc 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -16,17 +16,16 @@
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
-import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-
import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
import edu.uci.ics.asterix.external.util.ExternalIndexHashPartitionComputerFactory;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -60,24 +59,23 @@
super(physOptConf, propertiesProvider);
}
- @Override
- public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
- JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
- ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(
- secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, true,
- dataset.getDatasetId());
- ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
- localResourceMetadata, LocalResource.LSMBTreeResource);
+ @Override
+ public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+ //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+ ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(
+ secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, true,
+ dataset.getDatasetId());
+ ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
+ localResourceMetadata, LocalResource.LSMBTreeResource);
TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
secondaryBloomFilterKeyFields, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
- .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
@@ -87,117 +85,112 @@
return spec;
}
+ @Override
+ public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> RIDScanOpAndConstraints;
+ AlgebricksMetaOperatorDescriptor asterixAssignOp;
+ try {
+ //create external indexing scan operator
+ RIDScanOpAndConstraints = createExternalIndexingOp(spec);
- @Override
- public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException{
- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> RIDScanOpAndConstraints;
- AlgebricksMetaOperatorDescriptor asterixAssignOp;
- try
- {
- //create external indexing scan operator
- RIDScanOpAndConstraints = createExternalIndexingOp(spec);
+ //create assign operator
+ asterixAssignOp = createExternalAssignOp(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+ RIDScanOpAndConstraints.second);
+ } catch (Exception e) {
+ throw new AsterixException("Failed to create external index scanning and loading job");
+ }
- //create assign operator
- asterixAssignOp = createExternalAssignOp(spec);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- RIDScanOpAndConstraints.second);
- }
- catch(Exception e)
- {
- throw new AsterixException("Failed to create external index scanning and loading job");
- }
+ // If any of the secondary fields are nullable, then add a select op that filters nulls.
+ AlgebricksMetaOperatorDescriptor selectOp = null;
+ if (anySecondaryKeyIsNullable) {
+ selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
+ }
- // If any of the secondary fields are nullable, then add a select op that filters nulls.
- AlgebricksMetaOperatorDescriptor selectOp = null;
- if (anySecondaryKeyIsNullable) {
- selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
- }
+ // Sort by secondary keys.
+ ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc,
+ RIDScanOpAndConstraints.second);
+ AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+ // Create secondary BTree bulk load op.
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+ spec,
+ numSecondaryKeys,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
+ .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+ IBinaryHashFunctionFactory[] hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(
+ dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
- // Sort by secondary keys.
- ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc,RIDScanOpAndConstraints.second);
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- // Create secondary BTree bulk load op.
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
- spec,
- numSecondaryKeys,
- new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
- .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
- IBinaryHashFunctionFactory[] hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
+ //select partitioning keys (always the first 2 after secondary keys)
+ int[] keys = new int[2];
+ keys[0] = numSecondaryKeys;
+ keys[1] = numSecondaryKeys + 1;
- //select partitioning keys (always the first 2 after secondary keys)
- int[] keys = new int[2];
- keys[0] = numSecondaryKeys;
- keys[1] = numSecondaryKeys + 1;
+ IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
+ new ExternalIndexHashPartitionComputerFactory(keys, hashFactories));
- IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
- new ExternalIndexHashPartitionComputerFactory(keys, hashFactories));
+ spec.connect(new OneToOneConnectorDescriptor(spec), RIDScanOpAndConstraints.first, 0, asterixAssignOp, 0);
+ if (anySecondaryKeyIsNullable) {
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+ spec.connect(hashConn, selectOp, 0, sortOp, 0);
+ } else {
+ spec.connect(hashConn, asterixAssignOp, 0, sortOp, 0);
+ }
+ spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+ spec.addRoot(secondaryBulkLoadOp);
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
+ } else {
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- spec.connect(new OneToOneConnectorDescriptor(spec), RIDScanOpAndConstraints.first, 0, asterixAssignOp, 0);
- if (anySecondaryKeyIsNullable) {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
- spec.connect(hashConn, selectOp, 0, sortOp, 0);
- } else {
- spec.connect(hashConn, asterixAssignOp, 0, sortOp, 0);
- }
- spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
- spec.addRoot(secondaryBulkLoadOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- }
- else
- {
- JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ // Create dummy key provider for feeding the primary index scan.
+ AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
- // Create dummy key provider for feeding the primary index scan.
- AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+ // Create primary index scan op.
+ BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
- // Create primary index scan op.
- BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
+ // Assign op.
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numSecondaryKeys);
- // Assign op.
- AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numSecondaryKeys);
+ // If any of the secondary fields are nullable, then add a select op that filters nulls.
+ AlgebricksMetaOperatorDescriptor selectOp = null;
+ if (anySecondaryKeyIsNullable) {
+ selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
+ }
- // If any of the secondary fields are nullable, then add a select op that filters nulls.
- AlgebricksMetaOperatorDescriptor selectOp = null;
- if (anySecondaryKeyIsNullable) {
- selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
- }
+ // Sort by secondary keys.
+ ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
- // Sort by secondary keys.
- ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
+ AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+ // Create secondary BTree bulk load op.
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+ spec,
+ numSecondaryKeys,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
+ .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- // Create secondary BTree bulk load op.
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
- spec,
- numSecondaryKeys,
- new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
- .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
-
- // Connect the operators.
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
- if (anySecondaryKeyIsNullable) {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
- } else {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
- }
- spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
- spec.addRoot(secondaryBulkLoadOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- }
- }
+ // Connect the operators.
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
+ if (anySecondaryKeyIsNullable) {
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
+ } else {
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+ }
+ spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+ spec.addRoot(secondaryBulkLoadOp);
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
+ }
+ }
}
-
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 5da336f..1418c32 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -19,9 +19,6 @@
import java.io.IOException;
import java.util.List;
-import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
-import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
-import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
@@ -29,25 +26,30 @@
import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider;
import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
+import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
+import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.IsNullDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
+import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
@@ -83,7 +85,6 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -91,7 +92,6 @@
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-
@SuppressWarnings("rawtypes")
// TODO: We should eventually have a hierarchy of classes that can create all
// possible index job specs,
@@ -165,68 +165,67 @@
public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException;
protected void init(CompiledCreateIndexStatement createIndexStmt, AqlMetadataProvider metadataProvider)
- throws AsterixException, AlgebricksException {
- this.metadataProvider = metadataProvider;
- dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
- : createIndexStmt.getDataverseName();
- datasetName = createIndexStmt.getDatasetName();
- secondaryIndexName = createIndexStmt.getIndexName();
- dataset = metadataProvider.findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AsterixException("Unknown dataset " + datasetName);
- }
- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- //get external dataset details
- ExternalDatasetDetails edsd = ((ExternalDatasetDetails)dataset.getDatasetDetails());
- //get adapter name
- String adapter = edsd.getAdapter();
- //if not an hdfs adapter, throw an exception
- if(!adapter.equals(HDFSAdapterFactory.HDFS_ADAPTER_NAME) && !adapter.equals(HiveAdapter.class.getName()))
- {
- throw new AsterixException("Cannot index an external dataset with adapter type(" + adapter + ").");
- }
- //get the item type
- ARecordType externalItemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
- //number of primary keys here depends on the file input, 3 for rcfiles and 2 for text and sequence files.
- numPrimaryKeys = DatasetUtils.getExternalRIDSize(dataset);
- itemType = createExternalItemTypeWithRID(externalItemType);
- payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
- numSecondaryKeys = createIndexStmt.getKeyFields().size();
- //splits and constraints <--They don't exist-->
- primaryFileSplitProvider = null;
- primaryPartitionConstraint = null;
- //create secondary split and constraints
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForExternalDataset(dataverseName, datasetName,
- secondaryIndexName);
- secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
- secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
- // Must be called in this order.
- setExternalRIDDescAndComparators();
- setExternalSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
- numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
- }
- else
- {
- itemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
- payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
- numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
- numSecondaryKeys = createIndexStmt.getKeyFields().size();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName, datasetName);
- primaryFileSplitProvider = primarySplitsAndConstraint.first;
- primaryPartitionConstraint = primarySplitsAndConstraint.second;
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
- secondaryIndexName);
- secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
- secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
- // Must be called in this order.
- setPrimaryRecDescAndComparators();
- setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
- numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
- }
+ throws AsterixException, AlgebricksException {
+ this.metadataProvider = metadataProvider;
+ dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
+ : createIndexStmt.getDataverseName();
+ datasetName = createIndexStmt.getDatasetName();
+ secondaryIndexName = createIndexStmt.getIndexName();
+ dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AsterixException("Unknown dataset " + datasetName);
}
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ //get external dataset details
+ ExternalDatasetDetails edsd = ((ExternalDatasetDetails) dataset.getDatasetDetails());
+ //get adapter name
+ String adapter = edsd.getAdapter();
+ //if not an hdfs adapter, throw an exception
+ if (!adapter.equals(HDFSAdapterFactory.HDFS_ADAPTER_NAME) && !adapter.equals(HiveAdapter.class.getName())) {
+ throw new AsterixException("Cannot index an external dataset with adapter type(" + adapter + ").");
+ }
+ //get the item type
+ ARecordType externalItemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(),
+ dataset.getItemTypeName());
+ //number of primary keys here depends on the file input, 3 for rcfiles and 2 for text and sequence files.
+ numPrimaryKeys = DatasetUtils.getExternalRIDSize(dataset);
+ itemType = createExternalItemTypeWithRID(externalItemType);
+ payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+ numSecondaryKeys = createIndexStmt.getKeyFields().size();
+ //splits and constraints <--They don't exist-->
+ primaryFileSplitProvider = null;
+ primaryPartitionConstraint = null;
+ //create secondary split and constraints
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForExternalDataset(dataverseName, datasetName,
+ secondaryIndexName);
+ secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+ secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+ // Must be called in this order.
+ setExternalRIDDescAndComparators();
+ setExternalSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
+ numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
+ } else {
+ itemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
+ payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+ numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+ numSecondaryKeys = createIndexStmt.getKeyFields().size();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
+ datasetName);
+ primaryFileSplitProvider = primarySplitsAndConstraint.first;
+ primaryPartitionConstraint = primarySplitsAndConstraint.second;
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
+ secondaryIndexName);
+ secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+ secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+ // Must be called in this order.
+ setPrimaryRecDescAndComparators();
+ setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
+ numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
+ }
+ }
protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
@@ -310,209 +309,195 @@
return keyProviderOp;
}
-protected ARecordType createExternalItemTypeWithRID(
- ARecordType externalItemType) throws AsterixException {
+ protected ARecordType createExternalItemTypeWithRID(ARecordType externalItemType) throws AsterixException {
- String[] fieldsNames = new String[externalItemType.getFieldNames().length+numPrimaryKeys];
- IAType[] fieldsTypes = new IAType[externalItemType.getFieldTypes().length+numPrimaryKeys];
+ String[] fieldsNames = new String[externalItemType.getFieldNames().length + numPrimaryKeys];
+ IAType[] fieldsTypes = new IAType[externalItemType.getFieldTypes().length + numPrimaryKeys];
- //add RID fields names and types
- if(AqlMetadataProvider.isOptimizeExternalIndexes())
- {
- fieldsNames[0] = "_file-number";
- fieldsTypes[0] = BuiltinType.AINT32;
- }
- else
- {
- fieldsNames[0] = "_file-name";
- fieldsTypes[0] = BuiltinType.ASTRING;
- }
- fieldsNames[1] = "_byte-location";
- fieldsTypes[1] = BuiltinType.AINT64;
- if(numPrimaryKeys == 3)
- {
- //add the row number for rc files
- fieldsNames[2] = "_row-number";
- fieldsTypes[2] = BuiltinType.AINT32;
- }
-
- //add the original fields names and types
- for(int i=0; i < externalItemType.getFieldNames().length; i++)
- {
- fieldsNames[i+numPrimaryKeys] = externalItemType.getFieldNames()[i];
- fieldsTypes[i+numPrimaryKeys] = externalItemType.getFieldTypes()[i];
- }
- return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+ //add RID fields names and types
+ if (AqlMetadataProvider.isOptimizeExternalIndexes()) {
+ fieldsNames[0] = "_file-number";
+ fieldsTypes[0] = BuiltinType.AINT32;
+ } else {
+ fieldsNames[0] = "_file-name";
+ fieldsTypes[0] = BuiltinType.ASTRING;
+ }
+ fieldsNames[1] = "_byte-location";
+ fieldsTypes[1] = BuiltinType.AINT64;
+ if (numPrimaryKeys == 3) {
+ //add the row number for rc files
+ fieldsNames[2] = "_row-number";
+ fieldsTypes[2] = BuiltinType.AINT32;
}
- protected void setExternalRIDDescAndComparators() throws AlgebricksException {
+ //add the original fields names and types
+ for (int i = 0; i < externalItemType.getFieldNames().length; i++) {
+ fieldsNames[i + numPrimaryKeys] = externalItemType.getFieldNames()[i];
+ fieldsTypes[i + numPrimaryKeys] = externalItemType.getFieldTypes()[i];
+ }
+ return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+ }
- ISerializerDeserializer[] externalRecFields = new ISerializerDeserializer[itemType.getFieldNames().length];
- ITypeTraits[] externalTypeTraits = new ITypeTraits[itemType.getFieldNames().length];
+ protected void setExternalRIDDescAndComparators() throws AlgebricksException {
- primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
- primaryBloomFilterKeyFields = new int[numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
-
- if(AqlMetadataProvider.isOptimizeExternalIndexes())
- {
- primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true);
- }
- else
- {
- primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.ASTRING, true);
- }
- primaryComparatorFactories[1] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT64, true);
+ ISerializerDeserializer[] externalRecFields = new ISerializerDeserializer[itemType.getFieldNames().length];
+ ITypeTraits[] externalTypeTraits = new ITypeTraits[itemType.getFieldNames().length];
- primaryBloomFilterKeyFields[0]=0;
- primaryBloomFilterKeyFields[1]=1;
+ primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ primaryBloomFilterKeyFields = new int[numPrimaryKeys];
+ ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
- if(numPrimaryKeys == 3)
- {
- primaryComparatorFactories[2] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true);
- primaryBloomFilterKeyFields[2]=2;
- }
+ if (AqlMetadataProvider.isOptimizeExternalIndexes()) {
+ primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ BuiltinType.AINT32, true);
+ } else {
+ primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ BuiltinType.ASTRING, true);
+ }
+ primaryComparatorFactories[1] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ BuiltinType.AINT64, true);
- for(int i=0; i < itemType.getFieldNames().length; i++)
- {
- externalRecFields[i] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[i]);
- externalTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType.getFieldTypes()[i]);
- }
- primaryRecDesc = new RecordDescriptor(externalRecFields, externalTypeTraits);
+ primaryBloomFilterKeyFields[0] = 0;
+ primaryBloomFilterKeyFields[1] = 1;
+
+ if (numPrimaryKeys == 3) {
+ primaryComparatorFactories[2] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ BuiltinType.AINT32, true);
+ primaryBloomFilterKeyFields[2] = 2;
}
-protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
- AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
- secondaryKeyFields = createIndexStmt.getKeyFields();
- secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys+ numPrimaryKeys];
- secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
- secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
- ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
- IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
- .getBinaryComparatorFactoryProvider();
+ for (int i = 0; i < itemType.getFieldNames().length; i++) {
+ externalRecFields[i] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[i]);
+ externalTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType.getFieldTypes()[i]);
+ }
+ primaryRecDesc = new RecordDescriptor(externalRecFields, externalTypeTraits);
+ }
- for (int i = 0; i < numSecondaryKeys; i++) {
- secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
- itemType, secondaryKeyFields.get(i), 0);
- Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
- IAType keyType = keyTypePair.first;
- anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
- secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
- secondaryBloomFilterKeyFields[i] = i;
- }
+ protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+ AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+ secondaryKeyFields = createIndexStmt.getKeyFields();
+ secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys + numPrimaryKeys];
+ secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+ secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+ ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+ ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
+ IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
+ .getBinaryComparatorFactoryProvider();
- if(AqlMetadataProvider.isOptimizeExternalIndexes())
- {
- secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
- itemType, "_file-number", 0);
- }
- else
- {
- secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
- itemType, "_file-name", 0);
- }
- secondaryFieldAccessEvalFactories[numSecondaryKeys+1] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
- itemType, "_byte-location", 0);
- if(numPrimaryKeys == 3)
- {
- secondaryFieldAccessEvalFactories[numSecondaryKeys+2] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
- itemType, "_row-number", 0);
- }
-
- for (int i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
- secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
- secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
- }
- secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+ itemType, secondaryKeyFields.get(i), 0);
+ Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
+ IAType keyType = keyTypePair.first;
+ anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ secondaryRecFields[i] = keySerde;
+ secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
+ secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
+ secondaryBloomFilterKeyFields[i] = i;
}
-protected Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(JobSpecification spec) throws Exception {
- Pair<ExternalDataIndexingOperatorDescriptor,AlgebricksPartitionConstraint> indexingOpAndConstraints = metadataProvider.buildExternalDataIndexingRuntime(spec, itemType, dataset, NonTaggedDataFormat.INSTANCE);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first,
- indexingOpAndConstraints.second);
- return indexingOpAndConstraints;
+ if (AqlMetadataProvider.isOptimizeExternalIndexes()) {
+ secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat()
+ .getFieldAccessEvaluatorFactory(itemType, "_file-number", 0);
+ } else {
+ secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat()
+ .getFieldAccessEvaluatorFactory(itemType, "_file-name", 0);
+ }
+ secondaryFieldAccessEvalFactories[numSecondaryKeys + 1] = metadataProvider.getFormat()
+ .getFieldAccessEvaluatorFactory(itemType, "_byte-location", 0);
+ if (numPrimaryKeys == 3) {
+ secondaryFieldAccessEvalFactories[numSecondaryKeys + 2] = metadataProvider.getFormat()
+ .getFieldAccessEvaluatorFactory(itemType, "_row-number", 0);
}
-protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec) throws AlgebricksException {
- int[] outColumns = new int[numSecondaryKeys + numPrimaryKeys];
- int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
- outColumns[i] = i;
- projectionList[i] = i;
- }
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+ secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+ secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+ }
+ secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+ }
- IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
- for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
- sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
- secondaryFieldAccessEvalFactories[i]);
- }
- AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
- AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
- return asterixAssignOp;
+ protected Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(
+ JobSpecification spec) throws Exception {
+ Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> indexingOpAndConstraints = metadataProvider
+ .buildExternalDataIndexingRuntime(spec, itemType, dataset, NonTaggedDataFormat.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first,
+ indexingOpAndConstraints.second);
+ return indexingOpAndConstraints;
+ }
+
+ protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec) throws AlgebricksException {
+ int[] outColumns = new int[numSecondaryKeys + numPrimaryKeys];
+ int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+ for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
+ outColumns[i] = i;
+ projectionList[i] = i;
}
- protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
- IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc,
- AlgebricksPartitionConstraint partitionConstraints) {
- int[] sortFields = new int[secondaryComparatorFactories.length];
- for (int i = 0; i < secondaryComparatorFactories.length; i++) {
- sortFields[i] = i;
- }
- ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
- physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, partitionConstraints);
- return sortOp;
+ IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+ for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+ sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+ secondaryFieldAccessEvalFactories[i]);
+ }
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+ new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+ return asterixAssignOp;
+ }
+
+ protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
+ IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc,
+ AlgebricksPartitionConstraint partitionConstraints) {
+ int[] sortFields = new int[secondaryComparatorFactories.length];
+ for (int i = 0; i < secondaryComparatorFactories.length; i++) {
+ sortFields[i] = i;
+ }
+ ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+ physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, partitionConstraints);
+ return sortOp;
+ }
+
+ protected ARecordType createSecondaryItemType(ARecordType externalItemType, boolean isRCFile)
+ throws AsterixException {
+
+ String[] fieldsNames = new String[numSecondaryKeys + numPrimaryKeys];
+ IAType[] fieldsTypes = new IAType[numSecondaryKeys + numPrimaryKeys];
+
+ //first create the secondary index fields
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ fieldsNames[i] = secondaryKeyFields.get(i);
+ try {
+ fieldsTypes[i] = externalItemType.getFieldType(fieldsNames[i]);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ throw new AsterixException(e);
+ }
}
-protected ARecordType createSecondaryItemType(ARecordType externalItemType, boolean isRCFile) throws AsterixException
- {
-
- String[] fieldsNames = new String[numSecondaryKeys+numPrimaryKeys];
- IAType[] fieldsTypes = new IAType[numSecondaryKeys+numPrimaryKeys];
-
- //first create the secondary index fields
- for(int i=0; i<numSecondaryKeys; i++)
- {
- fieldsNames[i] = secondaryKeyFields.get(i);
- try {
- fieldsTypes[i] = externalItemType.getFieldType(fieldsNames[i]);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- throw new AsterixException(e);
- }
- }
-
- //second add RID fields (File name or number and byte location)
- if(AqlMetadataProvider.isOptimizeExternalIndexes())
- {
- fieldsNames[numSecondaryKeys] = "_file-number";
- fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
- }
- else
- {
- fieldsNames[numSecondaryKeys] = "_file-name";
- fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
- }
- fieldsNames[numSecondaryKeys+1] = "_byte-location";
- fieldsTypes[numSecondaryKeys+1] = BuiltinType.AINT64;
-
- if(isRCFile)
- {
- fieldsNames[numSecondaryKeys+2] = "_row-Number";
- fieldsTypes[numSecondaryKeys+2] = BuiltinType.AINT32;
- }
-
- //return type
- return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+ //second add RID fields (File name or number and byte location)
+ if (AqlMetadataProvider.isOptimizeExternalIndexes()) {
+ fieldsNames[numSecondaryKeys] = "_file-number";
+ fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
+ } else {
+ fieldsNames[numSecondaryKeys] = "_file-name";
+ fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
}
+ fieldsNames[numSecondaryKeys + 1] = "_byte-location";
+ fieldsTypes[numSecondaryKeys + 1] = BuiltinType.AINT64;
+
+ if (isRCFile) {
+ fieldsNames[numSecondaryKeys + 2] = "_row-Number";
+ fieldsTypes[numSecondaryKeys + 2] = BuiltinType.AINT32;
+ }
+
+ //return type
+ return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+ }
protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
// -Infinity
@@ -537,7 +522,7 @@
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()), false, searchCallbackFactory);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
primaryPartitionConstraint);
@@ -633,4 +618,3 @@
return asterixSelectOp;
}
}
-
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 6f64aa2..a3261e6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -283,23 +283,23 @@
if (!isPartitioned) {
return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate());
} else {
return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate());
}
}
-
+
@Override
- protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
- AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
- throw new AsterixException("Cannot create inverted index on external dataset due to composite RID Fields.");
- }
+ protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+ AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+ throw new AsterixException("Cannot create inverted index on external dataset due to composite RID Fields.");
+ }
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 6ce694c..ebfee66 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -102,10 +102,9 @@
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
- keyType, secondaryComparatorFactories.length), storageProperties
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+ secondaryComparatorFactories.length), storageProperties
.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
@@ -161,165 +160,159 @@
}
@Override
- protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
- AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
- secondaryKeyFields = createIndexStmt.getKeyFields();
- if (numSecondaryKeys != 1) {
- throw new AsterixException(
- "Cannot use "
- + numSecondaryKeys
- + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
- }
- Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
- IAType spatialType = spatialTypePair.first;
- anySecondaryKeyIsNullable = spatialTypePair.second;
- if (spatialType == null) {
- throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
- }
- int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
- numNestedSecondaryKeyFields = numDimensions * 2;
- secondaryFieldAccessEvalFactories = metadataProvider.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
- numPrimaryKeys, numDimensions);
- secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
- valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
- + numNestedSecondaryKeyFields];
- ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
- IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
- keyType = nestedKeyType.getTypeTag();
- for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
- ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(nestedKeyType);
- secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- nestedKeyType, true);
- secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
- valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
- }
+ protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+ AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+ secondaryKeyFields = createIndexStmt.getKeyFields();
+ if (numSecondaryKeys != 1) {
+ throw new AsterixException(
+ "Cannot use "
+ + numSecondaryKeys
+ + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+ }
+ Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
+ IAType spatialType = spatialTypePair.first;
+ anySecondaryKeyIsNullable = spatialTypePair.second;
+ if (spatialType == null) {
+ throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+ }
+ int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+ numNestedSecondaryKeyFields = numDimensions * 2;
+ secondaryFieldAccessEvalFactories = metadataProvider.getFormat().createMBRFactory(itemType,
+ secondaryKeyFields.get(0), numPrimaryKeys, numDimensions);
+ secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+ valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+ + numNestedSecondaryKeyFields];
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+ IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+ keyType = nestedKeyType.getTypeTag();
+ for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+ ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(nestedKeyType);
+ secondaryRecFields[i] = keySerde;
+ secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ nestedKeyType, true);
+ secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+ valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+ }
- // Add serializers and comparators for primary index fields.
- for (int i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
- secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
- }
- secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
- }
-
+ // Add serializers and comparators for primary index fields.
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+ secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
+ }
+ secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+ }
+
@Override
public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> RIDScanOpAndConstraints;
- AlgebricksMetaOperatorDescriptor asterixAssignOp;
- try
- {
- //create external indexing scan operator
- RIDScanOpAndConstraints = createExternalIndexingOp(spec);
- //create assign operator
- asterixAssignOp = createExternalAssignOp(spec);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- RIDScanOpAndConstraints.second);
- }
- catch(Exception e)
- {
- throw new AsterixException("Failed to create external index scanning and loading job");
- }
+ Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> RIDScanOpAndConstraints;
+ AlgebricksMetaOperatorDescriptor asterixAssignOp;
+ try {
+ //create external indexing scan operator
+ RIDScanOpAndConstraints = createExternalIndexingOp(spec);
+ //create assign operator
+ asterixAssignOp = createExternalAssignOp(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+ RIDScanOpAndConstraints.second);
+ } catch (Exception e) {
+ throw new AsterixException("Failed to create external index scanning and loading job");
+ }
- // If any of the secondary fields are nullable, then add a select op that filters nulls.
- AlgebricksMetaOperatorDescriptor selectOp = null;
- if (anySecondaryKeyIsNullable) {
- selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
- }
+ // If any of the secondary fields are nullable, then add a select op that filters nulls.
+ AlgebricksMetaOperatorDescriptor selectOp = null;
+ if (anySecondaryKeyIsNullable) {
+ selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
+ }
- // Create secondary RTree bulk load op.
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
- spec,
- numNestedSecondaryKeyFields,
- new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
- primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
- keyType, secondaryComparatorFactories.length), storageProperties
- .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
- // Connect the operators.
- // Create a hash partitioning connector
- ExternalDatasetDetails edsd = (ExternalDatasetDetails)dataset.getDatasetDetails();
- IBinaryHashFunctionFactory[] hashFactories = null;
- if(edsd.getProperties().get(HDFSAdapterFactory.KEY_INPUT_FORMAT).trim().equals(HDFSAdapterFactory.INPUT_FORMAT_RC))
- {
- hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
- }
- else
- {
- hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
- }
- //select partitioning keys (always the first 2 after secondary keys)
- int[] keys = new int[2];
- keys[0] = numSecondaryKeys;
- keys[1] = numSecondaryKeys + 1;
+ // Create secondary RTree bulk load op.
+ AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+ spec,
+ numNestedSecondaryKeyFields,
+ new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+ primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+ secondaryComparatorFactories.length), storageProperties
+ .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+ // Connect the operators.
+ // Create a hash partitioning connector
+ ExternalDatasetDetails edsd = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ IBinaryHashFunctionFactory[] hashFactories = null;
+ if (edsd.getProperties().get(HDFSAdapterFactory.KEY_INPUT_FORMAT).trim()
+ .equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
+ hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset,
+ NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
+ } else {
+ hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset,
+ NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
+ }
+ //select partitioning keys (always the first 2 after secondary keys)
+ int[] keys = new int[2];
+ keys[0] = numSecondaryKeys;
+ keys[1] = numSecondaryKeys + 1;
- IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
- new ExternalIndexHashPartitionComputerFactory(keys, hashFactories));
- spec.connect(new OneToOneConnectorDescriptor(spec), RIDScanOpAndConstraints.first, 0, asterixAssignOp, 0);
- if (anySecondaryKeyIsNullable) {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
- spec.connect(hashConn, selectOp, 0, secondaryBulkLoadOp, 0);
- } else {
- spec.connect(hashConn, asterixAssignOp, 0, secondaryBulkLoadOp, 0);
- }
- spec.addRoot(secondaryBulkLoadOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- }
- else
- {
-
- // Create dummy key provider for feeding the primary index scan.
- AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
-
- // Create primary index scan op.
- BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
-
- // Assign op.
- AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp,
- numNestedSecondaryKeyFields);
-
- // If any of the secondary fields are nullable, then add a select op that filters nulls.
- AlgebricksMetaOperatorDescriptor selectOp = null;
- if (anySecondaryKeyIsNullable) {
- selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeyFields);
- }
-
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- // Create secondary RTree bulk load op.
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
- spec,
- numNestedSecondaryKeyFields,
- new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
- primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
- keyType, secondaryComparatorFactories.length), storageProperties
- .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
-
- // Connect the operators.
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
- if (anySecondaryKeyIsNullable) {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, secondaryBulkLoadOp, 0);
+ IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
+ new ExternalIndexHashPartitionComputerFactory(keys, hashFactories));
+ spec.connect(new OneToOneConnectorDescriptor(spec), RIDScanOpAndConstraints.first, 0, asterixAssignOp, 0);
+ if (anySecondaryKeyIsNullable) {
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+ spec.connect(hashConn, selectOp, 0, secondaryBulkLoadOp, 0);
+ } else {
+ spec.connect(hashConn, asterixAssignOp, 0, secondaryBulkLoadOp, 0);
+ }
+ spec.addRoot(secondaryBulkLoadOp);
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
} else {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
+
+ // Create dummy key provider for feeding the primary index scan.
+ AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+
+ // Create primary index scan op.
+ BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
+
+ // Assign op.
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp,
+ numNestedSecondaryKeyFields);
+
+ // If any of the secondary fields are nullable, then add a select op that filters nulls.
+ AlgebricksMetaOperatorDescriptor selectOp = null;
+ if (anySecondaryKeyIsNullable) {
+ selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeyFields);
+ }
+
+ AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+ // Create secondary RTree bulk load op.
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+ spec,
+ numNestedSecondaryKeyFields,
+ new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+ primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+ secondaryComparatorFactories.length), storageProperties
+ .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+
+ // Connect the operators.
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
+ if (anySecondaryKeyIsNullable) {
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, secondaryBulkLoadOp, 0);
+ } else {
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
+ }
+ spec.addRoot(secondaryBulkLoadOp);
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
}
- spec.addRoot(secondaryBulkLoadOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- }
}
}
-
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
index 3610478..7e7ffd9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
@@ -17,52 +17,18 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
public class BaseOperationTracker implements ILSMOperationTracker {
protected final DatasetLifecycleManager datasetLifecycleManager;
- protected final ILSMIOOperationCallback ioOpCallback;
- protected long lastLSN;
- protected long firstLSN;
protected final int datasetID;
- public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager,
- ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
+ public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID) {
this.datasetLifecycleManager = datasetLifecycleManager;
- this.ioOpCallback = ioOpCallbackFactory == null ? NoOpIOOperationCallback.INSTANCE : ioOpCallbackFactory
- .createIOOperationCallback(this);
this.datasetID = datasetID;
- resetLSNs();
- }
-
- public ILSMIOOperationCallback getIOOperationCallback() {
- return ioOpCallback;
- }
-
- public long getLastLSN() {
- return lastLSN;
- }
-
- public long getFirstLSN() {
- return firstLSN;
- }
-
- public void updateLastLSN(long lastLSN) {
- if (firstLSN == -1) {
- firstLSN = lastLSN;
- }
- this.lastLSN = Math.max(this.lastLSN, lastLSN);
- }
-
- public void resetLSNs() {
- lastLSN = -1;
- firstLSN = -1;
}
@Override
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 617b6ff..6335fb1 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -26,7 +26,6 @@
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -253,7 +252,7 @@
if (iInfo.isOpen) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(((BaseOperationTracker) iInfo.index.getOperationTracker()).getIOOperationCallback());
+ accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
}
// Wait for the above flush op.
while (dsInfo.numActiveIOOps > 0) {
@@ -316,8 +315,7 @@
synchronized (datasetOpTrackers) {
ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
if (opTracker == null) {
- opTracker = new PrimaryIndexOperationTracker(this, datasetID,
- LSMBTreeIOOperationCallbackFactory.INSTANCE);
+ opTracker = new PrimaryIndexOperationTracker(this, datasetID);
datasetOpTrackers.put(datasetID, opTracker);
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 2ed4b0ec..92a33cd 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -22,7 +22,6 @@
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
@@ -33,9 +32,8 @@
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
- public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
- ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
- super(datasetLifecycleManager, ioOpCallbackFactory, datasetID);
+ public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID) {
+ super(datasetLifecycleManager, datasetID);
this.numActiveOperations = new AtomicInteger();
}
@@ -88,8 +86,7 @@
for (ILSMIndex lsmIndex : indexes) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(((BaseOperationTracker) lsmIndex.getOperationTracker())
- .getIOOperationCallback());
+ accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
}
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index da08cd8..4ebd921 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -17,7 +17,6 @@
import java.util.List;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
@@ -29,10 +28,11 @@
public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
- protected final BaseOperationTracker opTracker;
+ protected long firstLSN;
+ protected long lastLSN;
- public AbstractLSMIOOperationCallback(BaseOperationTracker opTracker) {
- this.opTracker = opTracker;
+ public AbstractLSMIOOperationCallback() {
+ resetLSNs();
}
@Override
@@ -42,7 +42,7 @@
@Override
public void afterFinalize(ILSMComponent newComponent) {
- opTracker.resetLSNs();
+ resetLSNs();
}
public abstract long getComponentLSN(List<ILSMComponent> oldComponents) throws HyracksDataException;
@@ -80,4 +80,25 @@
bufferCache.unpin(metadataPage);
}
}
+
+ protected void resetLSNs() {
+ firstLSN = -1;
+ lastLSN = -1;
+ }
+
+ public void updateLastLSN(long lastLSN) {
+ if (firstLSN == -1) {
+ firstLSN = lastLSN;
+ }
+ this.lastLSN = Math.max(this.lastLSN, lastLSN);
+ }
+
+ public long getFirstLSN() {
+ return firstLSN;
+ }
+
+ public long getLastLSN() {
+ return lastLSN;
+ }
+
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index b6025cb..d2eb5ba 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -25,8 +25,8 @@
public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMBTreeIOOperationCallback(BaseOperationTracker opTracker) {
- super(opTracker);
+ public LSMBTreeIOOperationCallback() {
+ super();
}
@Override
@@ -42,7 +42,7 @@
public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
// Implies a flush IO operation.
- return opTracker.getLastLSN();
+ return lastLSN;
}
// Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 92ba9ec..1028015 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
@@ -29,7 +28,7 @@
}
@Override
- public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
- return new LSMBTreeIOOperationCallback((BaseOperationTracker) syncObj);
+ public ILSMIOOperationCallback createIOOperationCallback() {
+ return new LSMBTreeIOOperationCallback();
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 4f99ae6..60d4af1 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -17,15 +17,14 @@
import java.util.List;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMInvertedIndexIOOperationCallback(BaseOperationTracker opTracker) {
- super(opTracker);
+ public LSMInvertedIndexIOOperationCallback() {
+ super();
}
@Override
@@ -41,7 +40,7 @@
public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
// Implies a flush IO operation.
- return opTracker.getLastLSN();
+ return lastLSN;
}
// Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index c20cdb3..5dc0c0b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
@@ -29,7 +28,7 @@
}
@Override
- public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
- return new LSMInvertedIndexIOOperationCallback((BaseOperationTracker) syncObj);
+ public ILSMIOOperationCallback createIOOperationCallback() {
+ return new LSMInvertedIndexIOOperationCallback();
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index cd7b7a0..770d514 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -17,15 +17,14 @@
import java.util.List;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMRTreeIOOperationCallback(BaseOperationTracker opTracker) {
- super(opTracker);
+ public LSMRTreeIOOperationCallback() {
+ super();
}
@Override
@@ -42,7 +41,7 @@
public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
// Implies a flush IO operation.
- return opTracker.getLastLSN();
+ return lastLSN;
}
// Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 0cd2539..841a1d5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
@@ -29,7 +28,7 @@
}
@Override
- public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
- return new LSMRTreeIOOperationCallback((BaseOperationTracker) syncObj);
+ public ILSMIOOperationCallback createIOOperationCallback() {
+ return new LSMRTreeIOOperationCallback();
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index 2f522b9..2638c9f 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -18,7 +18,6 @@
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -44,14 +43,6 @@
public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
- public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary);
-
- public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider();
-
- public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider();
-
- public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider();
-
public ILSMIOOperationScheduler getLSMIOScheduler();
public ILocalResourceRepository getLocalResourceRepository();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index ed89cfc..0c64497 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -336,11 +336,12 @@
AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
.getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
- LSMBTreeIOOperationCallbackFactory.INSTANCE, index.getDatasetId().getId());
+ index.getDatasetId().getId());
if (create) {
lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
- runtimeContext.getLSMMergePolicy(), opTracker, runtimeContext.getLSMIOScheduler(), rtcProvider);
+ runtimeContext.getLSMMergePolicy(), opTracker, runtimeContext.getLSMIOScheduler(),
+ LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
lsmBtree.create();
resourceID = runtimeContext.getResourceIdFactory().createId();
ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
@@ -359,7 +360,7 @@
typeTraits, comparatorFactories, bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
opTracker, runtimeContext.getLSMIOScheduler(),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+ LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
indexLifecycleManager.register(resourceID, lsmBtree);
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 078b340d..de94045 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -257,7 +257,7 @@
public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
return resultSerializerFactoryProvider;
}
-
+
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
}
@@ -275,12 +275,12 @@
}
public static boolean isOptimizeExternalIndexes() {
- return optimizeExternalIndexes;
- }
-
+ return optimizeExternalIndexes;
+ }
+
public static void setOptimizeExternalIndexes(boolean optimizeExternalIndexes) {
- AqlMetadataProvider.optimizeExternalIndexes = optimizeExternalIndexes;
- }
+ AqlMetadataProvider.optimizeExternalIndexes = optimizeExternalIndexes;
+ }
@Override
public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
@@ -419,215 +419,206 @@
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
}
-
+
@SuppressWarnings("rawtypes")
- public Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataIndexingRuntime(
- JobSpecification jobSpec, IAType itemType, Dataset dataset, IDataFormat format)
- throws AlgebricksException {
- IGenericDatasetAdapterFactory adapterFactory;
- IDatasourceAdapter adapter;
- String adapterName;
- DatasourceAdapter adapterEntity;
- String adapterFactoryClassname;
- ExternalDatasetDetails datasetDetails = null;
- try {
- datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
- adapterName = datasetDetails.getAdapter();
- adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
- adapterName);
- if (adapterEntity != null) {
- adapterFactoryClassname = adapterEntity.getClassname();
- adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- } else {
- adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
- if (adapterFactoryClassname == null) {
- throw new AlgebricksException(" Unknown adapter :" + adapterName);
- }
- adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- }
+ public Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataIndexingRuntime(
+ JobSpecification jobSpec, IAType itemType, Dataset dataset, IDataFormat format) throws AlgebricksException {
+ IGenericDatasetAdapterFactory adapterFactory;
+ IDatasourceAdapter adapter;
+ String adapterName;
+ DatasourceAdapter adapterEntity;
+ String adapterFactoryClassname;
+ ExternalDatasetDetails datasetDetails = null;
+ try {
+ datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ adapterName = datasetDetails.getAdapter();
+ adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+ adapterName);
+ if (adapterEntity != null) {
+ adapterFactoryClassname = adapterEntity.getClassname();
+ adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ } else {
+ adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
+ if (adapterFactoryClassname == null) {
+ throw new AlgebricksException(" Unknown adapter :" + adapterName);
+ }
+ adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ }
- adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createIndexingAdapter(
- wrapProperties(datasetDetails.getProperties()), itemType, null);
- } catch (AlgebricksException ae) {
- throw ae;
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("Unable to create adapter " + e);
- }
- if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
- IDatasourceAdapter.AdapterType.READ_WRITE))) {
- throw new AlgebricksException("external dataset adapter does not support read operation");
- }
- ARecordType rt = (ARecordType) itemType;
- ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
- RecordDescriptor indexerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
- ExternalDataIndexingOperatorDescriptor dataIndexScanner = null;
- List<ExternalFile> files = null;
- HashMap<String, Integer> filesNumbers = null;
- if(optimizeExternalIndexes)
- {
- try {
- files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
- } catch (MetadataException e) {
- e.printStackTrace();
- throw new AlgebricksException("Unable to get list of external files from metadata " + e);
- }
-
- filesNumbers = new HashMap<String,Integer>();
- for(int i=0; i< files.size(); i++)
- {
- filesNumbers.put(files.get(i).getFileName(), files.get(i).getFileNumber());
- }
-
- dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
- wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory,filesNumbers);
- }
- else
- {
- dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
- wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory,filesNumbers);
- }
- AlgebricksPartitionConstraint constraint;
- try {
- constraint = adapter.getPartitionConstraint();
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- return new Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint>(dataIndexScanner, constraint);
- }
-
- public ArrayList<ExternalFile> getExternalDatasetFiles(Dataset dataset) throws AlgebricksException
- {
- ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
- if(dataset.getDatasetType() != DatasetType.EXTERNAL)
- {
- throw new AlgebricksException("Can only get external dataset files");
- }
- ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails)dataset.getDatasetDetails();
- IGenericDatasetAdapterFactory adapterFactory;
- IDatasourceAdapter adapter;
- String adapterName;
- DatasourceAdapter adapterEntity;
- String adapterFactoryClassname;
- try {
- adapterName = datasetDetails.getAdapter();
- adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
- adapterName);
- if (adapterEntity != null) {
- adapterFactoryClassname = adapterEntity.getClassname();
- adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- } else {
- adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
- if (adapterFactoryClassname == null) {
- throw new AlgebricksException(" Unknown adapter :" + adapterName);
- }
- adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- }
+ adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createIndexingAdapter(
+ wrapProperties(datasetDetails.getProperties()), itemType, null);
+ } catch (AlgebricksException ae) {
+ throw ae;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Unable to create adapter " + e);
+ }
+ if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
+ IDatasourceAdapter.AdapterType.READ_WRITE))) {
+ throw new AlgebricksException("external dataset adapter does not support read operation");
+ }
+ ARecordType rt = (ARecordType) itemType;
+ ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+ RecordDescriptor indexerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+ ExternalDataIndexingOperatorDescriptor dataIndexScanner = null;
+ List<ExternalFile> files = null;
+ HashMap<String, Integer> filesNumbers = null;
+ if (optimizeExternalIndexes) {
+ try {
+ files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Unable to get list of external files from metadata " + e);
+ }
- adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
- wrapProperties(datasetDetails.getProperties()), null);
- }
- catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("Unable to create adapter " + e);
- }
-
- try {
- ArrayList<FileStatus> fileStatuses = ExternalDataFilesMetadataProvider.getHDFSFileStatus((AbstractDatasourceAdapter) adapter);
- for(int i=0; i<fileStatuses.size(); i++)
- {
- files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), new Date(fileStatuses.get(i).getModificationTime()),
- fileStatuses.get(i).getLen(),
- fileStatuses.get(i).getPath().toUri().getPath(),
- i));
- }
- return files;
- } catch (IOException e) {
- e.printStackTrace();
- throw new AlgebricksException("Unable to get list of HDFS files " + e);
- }
- }
+ filesNumbers = new HashMap<String, Integer>();
+ for (int i = 0; i < files.size(); i++) {
+ filesNumbers.put(files.get(i).getFileName(), files.get(i).getFileNumber());
+ }
- @SuppressWarnings("rawtypes")
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataAccesByRIDRuntime(
- JobSpecification jobSpec, Dataset dataset, Index secondaryIndex)
- throws AlgebricksException {
- IAType itemType = null;
- try {
- itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
- } catch (MetadataException e) {
- e.printStackTrace();
- throw new AlgebricksException("Unable to get item type from metadata " + e);
- }
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only scan datasets of records.");
- }
+ dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
+ wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory, filesNumbers);
+ } else {
+ dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
+ wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory, filesNumbers);
+ }
+ AlgebricksPartitionConstraint constraint;
+ try {
+ constraint = adapter.getPartitionConstraint();
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ return new Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint>(dataIndexScanner,
+ constraint);
+ }
- ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails)dataset.getDatasetDetails();
- IGenericDatasetAdapterFactory adapterFactory;
- IDatasourceAdapter adapter;
- String adapterName;
- DatasourceAdapter adapterEntity;
- String adapterFactoryClassname;
- try {
- adapterName = datasetDetails.getAdapter();
- adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
- adapterName);
- if (adapterEntity != null) {
- adapterFactoryClassname = adapterEntity.getClassname();
- adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- } else {
- adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
- if (adapterFactoryClassname == null) {
- throw new AlgebricksException(" Unknown adapter :" + adapterName);
- }
- adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- }
+ public ArrayList<ExternalFile> getExternalDatasetFiles(Dataset dataset) throws AlgebricksException {
+ ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+ if (dataset.getDatasetType() != DatasetType.EXTERNAL) {
+ throw new AlgebricksException("Can only get external dataset files");
+ }
+ ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ IGenericDatasetAdapterFactory adapterFactory;
+ IDatasourceAdapter adapter;
+ String adapterName;
+ DatasourceAdapter adapterEntity;
+ String adapterFactoryClassname;
+ try {
+ adapterName = datasetDetails.getAdapter();
+ adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+ adapterName);
+ if (adapterEntity != null) {
+ adapterFactoryClassname = adapterEntity.getClassname();
+ adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ } else {
+ adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
+ if (adapterFactoryClassname == null) {
+ throw new AlgebricksException(" Unknown adapter :" + adapterName);
+ }
+ adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ }
- adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
- wrapProperties(datasetDetails.getProperties()), itemType);
- } catch (AlgebricksException ae) {
- throw ae;
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("Unable to create adapter " + e);
- }
+ adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+ wrapProperties(datasetDetails.getProperties()), null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Unable to create adapter " + e);
+ }
- if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
- IDatasourceAdapter.AdapterType.READ_WRITE))) {
- throw new AlgebricksException("external dataset adapter does not support read operation");
- }
- IDataFormat format = NonTaggedDataFormat.INSTANCE;
- ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
- RecordDescriptor outRecDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+ try {
+ ArrayList<FileStatus> fileStatuses = ExternalDataFilesMetadataProvider
+ .getHDFSFileStatus((AbstractDatasourceAdapter) adapter);
+ for (int i = 0; i < fileStatuses.size(); i++) {
+ files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), new Date(fileStatuses
+ .get(i).getModificationTime()), fileStatuses.get(i).getLen(), fileStatuses.get(i).getPath()
+ .toUri().getPath(), i));
+ }
+ return files;
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Unable to get list of HDFS files " + e);
+ }
+ }
- ExternalDataAccessByRIDOperatorDescriptor dataAccessOperator = null;
- if(optimizeExternalIndexes)
- {
- //create the hashmap
- List<ExternalFile> files=null;
- try {
- files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
- } catch (MetadataException e) {
- e.printStackTrace();
- throw new AlgebricksException("Couldn't get file names for access by optimized RIDs",e);
- }
- HashMap<Integer, String> filesMapping = new HashMap<Integer, String>();
- for(int i=0; i < files.size(); i++)
- {
- filesMapping.put(files.get(i).getFileNumber(), files.get(i).getFileName());
- }
- dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec, wrapPropertiesEmpty(datasetDetails.getProperties()),
- itemType, outRecDesc, adapterFactory, filesMapping);
- }
- else
- {
- dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec, wrapPropertiesEmpty(datasetDetails.getProperties()),
- itemType, outRecDesc, adapterFactory, null);
- }
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = splitProviderAndPartitionConstraintsForExternalDataset(dataset.getDataverseName(),dataset.getDatasetName(),secondaryIndex.getIndexName());
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataAccessOperator, splitsAndConstraints.second);
- }
+ @SuppressWarnings("rawtypes")
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataAccesByRIDRuntime(
+ JobSpecification jobSpec, Dataset dataset, Index secondaryIndex) throws AlgebricksException {
+ IAType itemType = null;
+ try {
+ itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getItemTypeName()).getDatatype();
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Unable to get item type from metadata " + e);
+ }
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only scan datasets of records.");
+ }
+
+ ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ IGenericDatasetAdapterFactory adapterFactory;
+ IDatasourceAdapter adapter;
+ String adapterName;
+ DatasourceAdapter adapterEntity;
+ String adapterFactoryClassname;
+ try {
+ adapterName = datasetDetails.getAdapter();
+ adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+ adapterName);
+ if (adapterEntity != null) {
+ adapterFactoryClassname = adapterEntity.getClassname();
+ adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ } else {
+ adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
+ if (adapterFactoryClassname == null) {
+ throw new AlgebricksException(" Unknown adapter :" + adapterName);
+ }
+ adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ }
+
+ adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+ wrapProperties(datasetDetails.getProperties()), itemType);
+ } catch (AlgebricksException ae) {
+ throw ae;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Unable to create adapter " + e);
+ }
+
+ if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
+ IDatasourceAdapter.AdapterType.READ_WRITE))) {
+ throw new AlgebricksException("external dataset adapter does not support read operation");
+ }
+ IDataFormat format = NonTaggedDataFormat.INSTANCE;
+ ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+ RecordDescriptor outRecDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+ ExternalDataAccessByRIDOperatorDescriptor dataAccessOperator = null;
+ if (optimizeExternalIndexes) {
+ //create the hashmap
+ List<ExternalFile> files = null;
+ try {
+ files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Couldn't get file names for access by optimized RIDs", e);
+ }
+ HashMap<Integer, String> filesMapping = new HashMap<Integer, String>();
+ for (int i = 0; i < files.size(); i++) {
+ filesMapping.put(files.get(i).getFileNumber(), files.get(i).getFileName());
+ }
+ dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec,
+ wrapPropertiesEmpty(datasetDetails.getProperties()), itemType, outRecDesc, adapterFactory,
+ filesMapping);
+ } else {
+ dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec,
+ wrapPropertiesEmpty(datasetDetails.getProperties()), itemType, outRecDesc, adapterFactory, null);
+ }
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = splitProviderAndPartitionConstraintsForExternalDataset(
+ dataset.getDataverseName(), dataset.getDatasetName(), secondaryIndex.getIndexName());
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataAccessOperator,
+ splitsAndConstraints.second);
+ }
@SuppressWarnings("rawtypes")
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(JobSpecification jobSpec,
@@ -730,142 +721,141 @@
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
- List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
- JobGenContext context, boolean retainInput, Dataset dataset, String indexName, int[] lowKeyFields,
- int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, Object implConfig)
- throws AlgebricksException {
- boolean isSecondary = true;
- if(dataset.getDatasetType() == DatasetType.EXTERNAL){
- try {
- int numPrimaryKeys = DatasetUtils.getExternalRIDSize(dataset);
- RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
- int numKeys = numPrimaryKeys;;
- ITypeTraits[] typeTraits = null;
- int[] bloomFilterKeyFields;
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
- int numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
- numKeys += numSecondaryKeys;
- int keysStartIndex = outputVars.size() - numKeys;
- typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv, context);
- bloomFilterKeyFields = new int[numSecondaryKeys];
- for (int i = 0; i < numSecondaryKeys; i++) {
- bloomFilterKeyFields[i] = i;
- }
- IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
- outputVars, keysStartIndex, numKeys, typeEnv, context);
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = splitProviderAndPartitionConstraintsForExternalDataset(dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- ISearchOperationCallbackFactory searchCallbackFactory = null;
- searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
- AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
- BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
- typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
- lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
- isSecondary ? new SecondaryIndexOperationTrackerProvider(
- LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId())
- : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), rtcProvider,
- rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
- searchCallbackFactory);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
- }
- else
- {
- try {
- Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), dataset.getDatasetName());
- if (primaryIndex != null) {
- isSecondary = !indexName.equals(primaryIndex.getIndexName());
- }
- int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
- RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
- int numKeys = numPrimaryKeys;
- int keysStartIndex = outputRecDesc.getFieldCount() - numKeys - 1;
- ITypeTraits[] typeTraits = null;
- int[] bloomFilterKeyFields;
- if (isSecondary) {
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
- int numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
- numKeys += numSecondaryKeys;
- keysStartIndex = outputVars.size() - numKeys;
- typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv, context);
- bloomFilterKeyFields = new int[numSecondaryKeys];
- for (int i = 0; i < numSecondaryKeys; i++) {
- bloomFilterKeyFields[i] = i;
- }
- } else {
- typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys + 1, typeEnv,
- context);
- bloomFilterKeyFields = new int[numPrimaryKeys];
- for (int i = 0; i < numPrimaryKeys; i++) {
- bloomFilterKeyFields[i] = i;
- }
- }
- IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
- outputVars, keysStartIndex, numKeys, typeEnv, context);
-
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
-
- ISearchOperationCallbackFactory searchCallbackFactory = null;
- if (isSecondary) {
- searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
- } else {
- JobId jobId = ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- int[] primaryKeyFields = new int[numPrimaryKeys];
- for (int i = 0; i < numPrimaryKeys; i++) {
- primaryKeyFields[i] = i;
- }
-
- AqlMetadataImplConfig aqlMetadataImplConfig = (AqlMetadataImplConfig) implConfig;
- ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- if (aqlMetadataImplConfig != null && aqlMetadataImplConfig.isInstantLock()) {
- searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId,
- primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
- } else {
- searchCallbackFactory = new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId,
- primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+ List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
+ JobGenContext context, boolean retainInput, Dataset dataset, String indexName, int[] lowKeyFields,
+ int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, Object implConfig)
+ throws AlgebricksException {
+ boolean isSecondary = true;
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ try {
+ int numPrimaryKeys = DatasetUtils.getExternalRIDSize(dataset);
+ RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+ int numKeys = numPrimaryKeys;;
+ ITypeTraits[] typeTraits = null;
+ int[] bloomFilterKeyFields;
+ Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+ int numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
+ numKeys += numSecondaryKeys;
+ int keysStartIndex = outputVars.size() - numKeys;
+ typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv, context);
+ bloomFilterKeyFields = new int[numSecondaryKeys];
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ bloomFilterKeyFields[i] = i;
}
+ IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+ outputVars, keysStartIndex, numKeys, typeEnv, context);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+ try {
+ spPc = splitProviderAndPartitionConstraintsForExternalDataset(dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ ISearchOperationCallbackFactory searchCallbackFactory = null;
+ searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
+ AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
+ BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
+ isSecondary ? new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId())
+ : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ rtcProvider, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
+ searchCallbackFactory);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
}
- AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
- BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
- typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
- lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
- isSecondary ? new SecondaryIndexOperationTrackerProvider(
- LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId())
- : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), rtcProvider,
- rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
- searchCallbackFactory);
+ } else {
+ try {
+ Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), dataset.getDatasetName());
+ if (primaryIndex != null) {
+ isSecondary = !indexName.equals(primaryIndex.getIndexName());
+ }
+ int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+ RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+ int numKeys = numPrimaryKeys;
+ int keysStartIndex = outputRecDesc.getFieldCount() - numKeys - 1;
+ ITypeTraits[] typeTraits = null;
+ int[] bloomFilterKeyFields;
+ if (isSecondary) {
+ Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+ int numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
+ numKeys += numSecondaryKeys;
+ keysStartIndex = outputVars.size() - numKeys;
+ typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv,
+ context);
+ bloomFilterKeyFields = new int[numSecondaryKeys];
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ bloomFilterKeyFields[i] = i;
+ }
+ } else {
+ typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys + 1, typeEnv,
+ context);
+ bloomFilterKeyFields = new int[numPrimaryKeys];
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ bloomFilterKeyFields[i] = i;
+ }
+ }
+ IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+ outputVars, keysStartIndex, numKeys, typeEnv, context);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+ try {
+ spPc = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
- }
- }
-
+ ISearchOperationCallbackFactory searchCallbackFactory = null;
+ if (isSecondary) {
+ searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
+ } else {
+ JobId jobId = ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ int[] primaryKeyFields = new int[numPrimaryKeys];
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ primaryKeyFields[i] = i;
+ }
+
+ AqlMetadataImplConfig aqlMetadataImplConfig = (AqlMetadataImplConfig) implConfig;
+ ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ if (aqlMetadataImplConfig != null && aqlMetadataImplConfig.isInstantLock()) {
+ searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId,
+ primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+ } else {
+ searchCallbackFactory = new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId,
+ primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+ }
+ }
+ AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
+ BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
+ isSecondary ? new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId())
+ : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ rtcProvider, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
+ searchCallbackFactory);
+
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
+
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
+ }
+ }
+ }
+
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, Dataset dataset, String indexName, int[] keyFields)
@@ -923,11 +913,11 @@
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
- nestedKeyType.getTypeTag(), comparatorFactories.length),
- storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(nestedKeyType.getTypeTag(),
+ comparatorFactories.length), storageProperties.getBloomFilterFalsePositiveRate()),
+ retainInput, searchCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
} catch (MetadataException me) {
@@ -1083,7 +1073,7 @@
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
@@ -1151,7 +1141,7 @@
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
.getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
@@ -1345,9 +1335,9 @@
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE,
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
.getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory,
false);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
@@ -1473,10 +1463,9 @@
invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, storageProperties
.getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
splitsAndConstraint.second);
@@ -1568,12 +1557,11 @@
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
- nestedKeyType.getTypeTag(), comparatorFactories.length),
- storageProperties.getBloomFilterFalsePositiveRate()), filterFactory,
- modificationCallbackFactory, false);
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(nestedKeyType.getTypeTag(),
+ comparatorFactories.length), storageProperties.getBloomFilterFalsePositiveRate()),
+ filterFactory, modificationCallbackFactory, false);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
} catch (MetadataException | IOException e) {
throw new AlgebricksException(e);
@@ -1622,18 +1610,15 @@
int numPartitions = 0;
List<String> nodeGroup = null;
- if(dataset.getDatasetType() == DatasetType.EXTERNAL)
- {
- ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
- nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
- .getNodeNames();
- }
- else
- {
- InternalDatasetDetails datasetDetails = (InternalDatasetDetails) dataset.getDatasetDetails();
- nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
- .getNodeNames();
- }
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
+ .getNodeNames();
+ } else {
+ InternalDatasetDetails datasetDetails = (InternalDatasetDetails) dataset.getDatasetDetails();
+ nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
+ .getNodeNames();
+ }
for (String nd : nodeGroup) {
numPartitions += AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
@@ -1653,11 +1638,11 @@
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForExternalDataset(
- String dataverseName, String datasetName, String targetIdxName) throws AlgebricksException {
- FileSplit[] splits = splitsForExternalDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName);
- return splitProviderAndPartitionConstraints(splits);
- }
-
+ String dataverseName, String datasetName, String targetIdxName) throws AlgebricksException {
+ FileSplit[] splits = splitsForExternalDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName);
+ return splitProviderAndPartitionConstraints(splits);
+ }
+
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
String dataverse) {
FileSplit[] splits = splitsForDataverse(mdTxnCtx, dataverse);
@@ -1749,55 +1734,55 @@
}
private FileSplit[] splitsForExternalDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
- String datasetName, String targetIdxName) throws AlgebricksException {
+ String datasetName, String targetIdxName) throws AlgebricksException {
- try {
- File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
- Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- if (dataset.getDatasetType() != DatasetType.EXTERNAL) {
- throw new AlgebricksException("Not an external dataset");
- }
- ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
- List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
- .getNodeNames();
- if (nodeGroup == null) {
- throw new AlgebricksException("Couldn't find node group " + datasetDetails.getNodeGroupName());
- }
+ try {
+ File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
+ Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+ if (dataset.getDatasetType() != DatasetType.EXTERNAL) {
+ throw new AlgebricksException("Not an external dataset");
+ }
+ ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
+ .getNodeNames();
+ if (nodeGroup == null) {
+ throw new AlgebricksException("Couldn't find node group " + datasetDetails.getNodeGroupName());
+ }
- List<FileSplit> splitArray = new ArrayList<FileSplit>();
- for (String nd : nodeGroup) {
- String[] nodeStores = stores.get(nd);
- if (nodeStores == null) {
- LOGGER.warning("Node " + nd + " has no stores.");
- throw new AlgebricksException("Node " + nd + " has no stores.");
- } else {
- int numIODevices;
- if (datasetDetails.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
- numIODevices = 1;
- } else {
- numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
- }
- String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
- for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- File f = new File(ioDevices[k] + File.separator + nodeStores[j] + File.separator
- + relPathFile);
- splitArray.add(new FileSplit(nd, new FileReference(f), k));
- }
- }
- }
- }
- FileSplit[] splits = new FileSplit[splitArray.size()];
- int i = 0;
- for (FileSplit fs : splitArray) {
- splits[i++] = fs;
- }
- return splits;
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
- }
-
+ List<FileSplit> splitArray = new ArrayList<FileSplit>();
+ for (String nd : nodeGroup) {
+ String[] nodeStores = stores.get(nd);
+ if (nodeStores == null) {
+ LOGGER.warning("Node " + nd + " has no stores.");
+ throw new AlgebricksException("Node " + nd + " has no stores.");
+ } else {
+ int numIODevices;
+ if (datasetDetails.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
+ numIODevices = 1;
+ } else {
+ numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
+ }
+ String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
+ for (int j = 0; j < nodeStores.length; j++) {
+ for (int k = 0; k < numIODevices; k++) {
+ File f = new File(ioDevices[k] + File.separator + nodeStores[j] + File.separator
+ + relPathFile);
+ splitArray.add(new FileSplit(nd, new FileReference(f), k));
+ }
+ }
+ }
+ }
+ FileSplit[] splits = new FileSplit[splitArray.size()];
+ int i = 0;
+ for (FileSplit fs : splitArray) {
+ splits[i++] = fs;
+ }
+ return splits;
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
+ }
+ }
+
private static Map<String, String> initializeAdapterFactoryMapping() {
Map<String, String> adapterFactoryMapping = new HashMap<String, String>();
adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter",
@@ -1811,7 +1796,7 @@
adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter",
"edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapterFactory");
adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter",
- "edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory");
+ "edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory");
return adapterFactoryMapping;
}
@@ -1910,4 +1895,3 @@
}
}
-
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
index aec378b..140a8dd 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
@@ -18,7 +18,6 @@
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
@@ -27,10 +26,8 @@
private static final long serialVersionUID = 1L;
private final int datasetID;
- private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
- public SecondaryIndexOperationTrackerProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
- this.ioOpCallbackFactory = ioOpCallbackFactory;
+ public SecondaryIndexOperationTrackerProvider(int datasetID) {
this.datasetID = datasetID;
}
@@ -38,7 +35,7 @@
public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
.getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
- return new BaseOperationTracker(dslcManager, ioOpCallbackFactory, datasetID);
+ return new BaseOperationTracker(dslcManager, datasetID);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index d243dd2..cf60182 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -55,10 +55,11 @@
LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, runtimeContextProvider
.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories,
bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
- .getLSMMergePolicy(), isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID)
- : new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
- LSMBTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
- .getLSMIOScheduler(), runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(isPrimary));
+ .getLSMMergePolicy(),
+ isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
+ (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+ runtimeContextProvider.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
+ .createIOOperationCallback());
return lsmBTree;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index 8482172..5ed7019 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -59,25 +59,37 @@
List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
try {
if (isPartitioned) {
- return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCaches, runtimeContextProvider
- .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
- tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
- .getLSMMergePolicy(), new BaseOperationTracker(
- (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
- .getLSMIOScheduler(), runtimeContextProvider
- .getLSMInvertedIndexIOOperationCallbackProvider());
+ return InvertedIndexUtils.createPartitionedLSMInvertedIndex(
+ virtualBufferCaches,
+ runtimeContextProvider.getFileMapManager(),
+ invListTypeTraits,
+ invListCmpFactories,
+ tokenTypeTraits,
+ tokenCmpFactories,
+ tokenizerFactory,
+ runtimeContextProvider.getBufferCache(),
+ filePath,
+ runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ runtimeContextProvider.getLSMMergePolicy(),
+ new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
+ .getIndexLifecycleManager(), datasetID), runtimeContextProvider.getLSMIOScheduler(),
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
} else {
- return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCaches, runtimeContextProvider
- .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
- tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
- .getLSMMergePolicy(), new BaseOperationTracker(
- (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
- .getLSMIOScheduler(), runtimeContextProvider
- .getLSMInvertedIndexIOOperationCallbackProvider());
+ return InvertedIndexUtils.createLSMInvertedIndex(
+ virtualBufferCaches,
+ runtimeContextProvider.getFileMapManager(),
+ invListTypeTraits,
+ invListCmpFactories,
+ tokenTypeTraits,
+ tokenCmpFactories,
+ tokenizerFactory,
+ runtimeContextProvider.getBufferCache(),
+ filePath,
+ runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ runtimeContextProvider.getLSMMergePolicy(),
+ new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
+ .getIndexLifecycleManager(), datasetID), runtimeContextProvider.getLSMIOScheduler(),
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
}
} catch (IndexException e) {
throw new HyracksDataException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index bc1e889..dc6b30b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -66,10 +66,9 @@
runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
runtimeContextProvider.getLSMMergePolicy(), new BaseOperationTracker(
- (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
- LSMRTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
- .getLSMIOScheduler(), runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(),
- linearizeCmpFactory);
+ (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+ runtimeContextProvider.getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
+ .createIOOperationCallback(), linearizeCmpFactory);
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index 6f6da4a..dca14d8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -16,8 +16,8 @@
import java.util.List;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -70,7 +70,8 @@
long firstLSN;
if (openIndexList.size() > 0) {
for (IIndex index : openIndexList) {
- firstLSN = ((BaseOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+ firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
+ .getFirstLSN();
minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
}
} else {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 2ad3055..d0e9348 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -38,7 +38,6 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
@@ -297,10 +296,8 @@
//#. get maxDiskLastLSN
ILSMIndex lsmIndex = (ILSMIndex) index;
- BaseOperationTracker indexOpTracker = (BaseOperationTracker) lsmIndex.getOperationTracker();
- AbstractLSMIOOperationCallback abstractLSMIOCallback = (AbstractLSMIOOperationCallback) indexOpTracker
- .getIOOperationCallback();
- maxDiskLastLsn = abstractLSMIOCallback.getComponentLSN(index.getImmutableComponents());
+ maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+ .getComponentLSN(lsmIndex.getImmutableComponents());
//#. set resourceId and maxDiskLastLSN to the map
resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn));
@@ -375,9 +372,8 @@
ILSMIndex lsmIndex = (ILSMIndex) index;
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- BaseOperationTracker indexOpTracker = (BaseOperationTracker) lsmIndex.getOperationTracker();
BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
- indexOpTracker.getIOOperationCallback());
+ lsmIndex.getIOOperationCallback());
callbackList.add(cb);
try {
indexAccessor.scheduleFlush(cb);
@@ -399,7 +395,8 @@
minMCTFirstLSN = Long.MAX_VALUE;
if (openIndexList.size() > 0) {
for (IIndex index : openIndexList) {
- firstLSN = ((BaseOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+ firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
+ .getFirstLSN();
minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
}
} else {
@@ -614,8 +611,8 @@
loserTxnTable.remove(tempKeyTxnId);
if (IS_DEBUG_MODE) {
entityCommitLogCount++;
- System.out.println("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
- + tempKeyTxnId);
+ System.out.println("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN
+ + "]" + tempKeyTxnId);
}
break;
@@ -721,7 +718,7 @@
} else {
throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
}
- ((BaseOperationTracker) index.getOperationTracker()).updateLastLSN(logRecord.getLSN());
+ ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()).updateLastLSN(logRecord.getLSN());
} catch (Exception e) {
throw new IllegalStateException("Failed to redo", e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 59a8363..5d4806d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -15,15 +15,11 @@
package edu.uci.ics.asterix.transaction.management.service.transaction;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
@@ -33,18 +29,12 @@
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
- ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider,
- ILSMIOOperationCallbackProvider {
+ ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider {
private static final long serialVersionUID = 1L;
public static final AsterixRuntimeComponentsProvider RUNTIME_PROVIDER = new AsterixRuntimeComponentsProvider();
-
- private AsterixRuntimeComponentsProvider() {
- }
- @Override
- public ILSMIOOperationCallback getIOOperationCallback(ILSMIndex index) {
- return ((BaseOperationTracker) index.getOperationTracker()).getIOOperationCallback();
+ private AsterixRuntimeComponentsProvider() {
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 678956b..c54cb7f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -21,9 +21,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.context.PrimaryIndexOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
@@ -71,7 +71,7 @@
//indexMap is concurrently accessed by multiple threads,
//so those threads are synchronized on indexMap object itself
- private Map<MutableLong, BaseOperationTracker> indexMap;
+ private Map<MutableLong, AbstractLSMIOOperationCallback> indexMap;
//TODO: fix ComponentLSNs' issues.
//primaryIndex, primaryIndexCallback, and primaryIndexOptracker will be modified accordingly
@@ -97,7 +97,7 @@
isTimeout = false;
isWriteTxn = new AtomicBoolean(false);
isMetadataTxn = false;
- indexMap = new HashMap<MutableLong, BaseOperationTracker>();
+ indexMap = new HashMap<MutableLong, AbstractLSMIOOperationCallback>();
primaryIndex = null;
tempResourceIdForRegister = new MutableLong();
tempResourceIdForSetLSN = new MutableLong();
@@ -114,7 +114,8 @@
}
tempResourceIdForRegister.set(resourceId);
if (!indexMap.containsKey(tempResourceIdForRegister)) {
- indexMap.put(new MutableLong(resourceId), ((BaseOperationTracker) index.getOperationTracker()));
+ indexMap.put(new MutableLong(resourceId),
+ ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()));
}
}
}
@@ -122,16 +123,14 @@
//[Notice]
//This method is called sequentially by the LogAppender threads.
//However, the indexMap is concurrently read and modified through this method and registerIndexAndCallback()
- //TODO: fix issues - 591, 609, 612, and 614.
@Override
public void setLastLSN(long resourceId, long LSN) {
synchronized (indexMap) {
firstLSN.compareAndSet(-1, LSN);
lastLSN.set(Math.max(lastLSN.get(), LSN));
tempResourceIdForSetLSN.set(resourceId);
- //TODO; create version number tracker and keep LSNs there.
- BaseOperationTracker opTracker = indexMap.get(tempResourceIdForSetLSN);
- opTracker.updateLastLSN(LSN);
+ AbstractLSMIOOperationCallback ioOpCallback = indexMap.get(tempResourceIdForSetLSN);
+ ioOpCallback.updateLastLSN(LSN);
}
}