Added many fixes. Checkpointpoing.
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 23f1d56..56fc0e7 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -20,6 +20,7 @@
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
@@ -38,6 +39,7 @@
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -217,18 +219,18 @@
if (!isPartitioned) {
dataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate());
} else {
dataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate());
}
LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index 439feff..ee9dfae 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -96,26 +96,22 @@
@Override
public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
@Override
public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary) {
- if (isPrimary) {
- return AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER;
- } else {
- return AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER;
- }
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
@Override
public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
@Override
public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
@Override
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index 8086b0d..9af6c4c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -123,12 +123,12 @@
datasetName);
AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()));
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
splitsAndConstraint.second);
@@ -175,13 +175,12 @@
localResourceMetadata, LocalResource.LSMBTreeResource);
TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
@@ -260,15 +259,15 @@
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad;
if (!loadStmt.alreadySorted()) {
btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, splitsAndConstraint.first, typeTraits,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, splitsAndConstraint.first, typeTraits,
comparatorFactories, blooFilterKeyFields, fieldPermutation, GlobalConfig.DEFAULT_BTREE_FILL_FACTOR,
true, numElementsHint, true, new LSMBTreeDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
splitsAndConstraint.second);
@@ -286,15 +285,15 @@
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
} else {
btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, splitsAndConstraint.first, typeTraits,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, splitsAndConstraint.first, typeTraits,
comparatorFactories, blooFilterKeyFields, fieldPermutation, GlobalConfig.DEFAULT_BTREE_FILL_FACTOR,
- true, numElementsHint, true, new LSMBTreeDataflowHelperFactory(
+ false, numElementsHint, true, new LSMBTreeDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index 7bd6c69..af56894 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -18,10 +18,12 @@
import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
@@ -66,12 +68,12 @@
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName, indexName);
AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
+ .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()));
AlgebricksPartitionConstraintHelper
.setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index 052368a..e3832d4 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -19,6 +19,8 @@
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -60,13 +62,13 @@
localResourceMetadata, LocalResource.LSMBTreeResource);
TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
secondaryBloomFilterKeyFields, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
+ .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
@@ -104,10 +106,10 @@
spec,
numSecondaryKeys,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, storageProperties
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index df6470f..f716387 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -290,14 +290,13 @@
ResourceType.LSM_BTREE);
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), false, searchCallbackFactory);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
primaryPartitionConstraint);
@@ -351,11 +350,10 @@
fieldPermutation[i] = i;
}
TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, secondaryFileSplitProvider,
- secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields,
- fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
- NoOpOperationCallbackFactory.INSTANCE);
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
+ secondaryBloomFilterKeyFields, fieldPermutation, fillFactor, false, numElementsHint, false,
+ dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
secondaryPartitionConstraint);
return treeIndexBulkLoadOp;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 17590c5..40e0aa9 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -22,11 +22,13 @@
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.asterix.runtime.formats.FormatUtils;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -176,8 +178,8 @@
IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, secondaryFileSplitProvider,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory,
localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp,
@@ -267,8 +269,8 @@
IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
spec, fieldPermutation, false, numElementsHint, false,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, secondaryFileSplitProvider,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp,
@@ -280,17 +282,17 @@
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
if (!isPartitioned) {
return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate());
} else {
return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate());
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 17632aa..ec62068 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -21,6 +21,7 @@
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -30,6 +31,7 @@
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -84,14 +86,14 @@
localResourceMetadata, LocalResource.LSMRTreeResource);
TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, null,
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
keyType, secondaryComparatorFactories.length), storageProperties
.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
@@ -174,10 +176,10 @@
numNestedSecondaryKeyFields,
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
keyType, secondaryComparatorFactories.length), storageProperties
.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/insert-and-scan-dataset-with-index/insert-and-scan-dataset-with-index.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/dml/insert-and-scan-dataset-with-index/insert-and-scan-dataset-with-index.2.update.aql
index cf7b309..29f6242 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/insert-and-scan-dataset-with-index/insert-and-scan-dataset-with-index.2.update.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/insert-and-scan-dataset-with-index/insert-and-scan-dataset-with-index.2.update.aql
@@ -8,8 +8,6 @@
use dataverse test;
-use dataverse test;
-
load dataset test.employee
using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
(("path"="nc1://data/names.adm"),("format"="delimited-text"),("delimiter"="|"));
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
index c48115c..3610478 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
@@ -26,13 +26,18 @@
public class BaseOperationTracker implements ILSMOperationTracker {
+ protected final DatasetLifecycleManager datasetLifecycleManager;
protected final ILSMIOOperationCallback ioOpCallback;
protected long lastLSN;
protected long firstLSN;
+ protected final int datasetID;
- public BaseOperationTracker(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+ public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
+ this.datasetLifecycleManager = datasetLifecycleManager;
this.ioOpCallback = ioOpCallbackFactory == null ? NoOpIOOperationCallback.INSTANCE : ioOpCallbackFactory
.createIOOperationCallback(this);
+ this.datasetID = datasetID;
resetLSNs();
}
@@ -63,11 +68,17 @@
@Override
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ datasetLifecycleManager.declareActiveIOOperation(datasetID);
+ }
}
@Override
public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
+ }
}
@Override
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
index 3ffa73d..867c5e3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
@@ -34,8 +34,9 @@
this.ctx = ctx;
}
- public void diskComponentAdded(final ILSMIndex index, int totalNumDiskComponents) throws HyracksDataException,
- IndexException {
+ @Override
+ public void diskComponentAdded(final ILSMIndex index) throws HyracksDataException, IndexException {
+ int totalNumDiskComponents = index.getImmutableComponents().size();
if (!ctx.isShuttingdown() && totalNumDiskComponents >= threshold) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index a4d4daa..17ad331 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -96,32 +96,65 @@
}
@Override
- public synchronized void unregister(long resourceID) throws HyracksDataException {
- int did = getDIDfromRID(resourceID);
- DatasetInfo dsInfo = datasetInfos.get(did);
- IndexInfo iInfo = dsInfo.indexes.remove(resourceID);
- if (dsInfo == null || iInfo == null) {
- throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
- }
+ public void unregister(long resourceID) throws HyracksDataException {
+ int did;
+ PrimaryIndexOperationTracker opTracker;
+ DatasetInfo dsInfo;
+ IndexInfo iInfo;
+ synchronized (this) {
+ did = getDIDfromRID(resourceID);
+ dsInfo = datasetInfos.get(did);
+ iInfo = dsInfo.indexes.get(resourceID);
- if (iInfo.referenceCount != 0) {
- dsInfo.indexes.put(resourceID, iInfo);
- throw new HyracksDataException("Cannot remove index while it is open.");
- }
+ if (dsInfo == null || iInfo == null) {
+ throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
+ }
+ opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers.get(dsInfo.datasetID);
+ if (iInfo.referenceCount != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+ throw new HyracksDataException("Cannot remove index while it is open.");
+ }
+
+ while (dsInfo.numActiveIOOps > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
if (iInfo.isOpen) {
iInfo.index.deactivate(true);
}
- if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty()) {
- List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
- assert vbcs != null;
- for (IVirtualBufferCache vbc : vbcs) {
- used -= (vbc.getNumPages() * vbc.getPageSize());
+ synchronized (this) {
+ dsInfo.indexes.remove(resourceID);
+ if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty()) {
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
+ assert vbcs != null;
+ for (IVirtualBufferCache vbc : vbcs) {
+ used -= (vbc.getNumPages() * vbc.getPageSize());
+ }
+ datasetInfos.remove(did);
}
- datasetInfos.remove(did);
}
+ }
+ public synchronized void declareActiveIOOperation(int datasetID) throws HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetID);
+ if (dsInfo == null) {
+ throw new HyracksDataException("Failed to find a dataset with ID " + datasetID);
+ }
+ dsInfo.incrementActiveIOOps();
+ }
+
+ public synchronized void undeclareActiveIOOperation(int datasetID) throws HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetID);
+ if (dsInfo == null) {
+ throw new HyracksDataException("Failed to find a dataset with ID " + datasetID);
+ }
+ dsInfo.decrementActiveIOOps();
+ notifyAll();
}
@Override
@@ -171,9 +204,17 @@
List<DatasetInfo> datasetInfosList = new ArrayList<DatasetInfo>(datasetInfos.values());
Collections.sort(datasetInfosList);
for (DatasetInfo dsInfo : datasetInfosList) {
- ILSMOperationTracker opTracker = datasetOpTrackers.get(dsInfo.datasetID);
- if (opTracker != null && ((PrimaryIndexOperationTracker) opTracker).isActiveDataset()
- && dsInfo.referenceCount == 0 && dsInfo.isOpen) {
+ PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers
+ .get(dsInfo.datasetID);
+ if (opTracker != null && opTracker.getNumActiveOperations() == 0 && dsInfo.referenceCount == 0
+ && dsInfo.isOpen) {
+ while (dsInfo.numActiveIOOps > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
for (IndexInfo iInfo : dsInfo.indexes.values()) {
if (iInfo.isOpen) {
iInfo.index.deactivate(true);
@@ -295,6 +336,7 @@
private final Map<Long, IndexInfo> indexes;
private final int datasetID;
private long lastAccess;
+ private int numActiveIOOps;
public DatasetInfo(int datasetID) {
this.indexes = new HashMap<Long, IndexInfo>();
@@ -312,6 +354,14 @@
lastAccess = System.currentTimeMillis();
}
+ public void incrementActiveIOOps() {
+ numActiveIOOps++;
+ }
+
+ public void decrementActiveIOOps() {
+ numActiveIOOps--;
+ }
+
@Override
public int compareTo(DatasetInfo i) {
// sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
@@ -391,4 +441,4 @@
}
outputStream.write(sb.toString().getBytes());
}
-}
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index fabcc35..b83fe8a 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.common.context;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -26,41 +25,34 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
public class PrimaryIndexOperationTracker extends BaseOperationTracker {
- private final DatasetLifecycleManager datasetLifecycleManager;
- private final List<IVirtualBufferCache> datasetBufferCaches;
- private final int datasetID;
// Number of active operations on a ILSMIndex instance.
- private AtomicInteger[] numActiveOperations;
+ private AtomicInteger numActiveOperations;
public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
- super(ioOpCallbackFactory);
- this.datasetLifecycleManager = datasetLifecycleManager;
- this.datasetID = datasetID;
- this.datasetBufferCaches = datasetLifecycleManager.getVirtualBufferCaches(datasetID);
- this.numActiveOperations = new AtomicInteger[datasetBufferCaches.size()];
- for (int i = 0; i < numActiveOperations.length; i++) {
- this.numActiveOperations[i] = new AtomicInteger(0);
- }
+ super(datasetLifecycleManager, ioOpCallbackFactory, datasetID);
+ this.numActiveOperations = new AtomicInteger();
}
@Override
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
- numActiveOperations[index.getCurrentMutableComponentId()].incrementAndGet();
+ if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
+ numActiveOperations.incrementAndGet();
+ } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ datasetLifecycleManager.declareActiveIOOperation(datasetID);
+ }
}
@Override
public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
// Searches are immediately considered complete, because they should not prevent the execution of flushes.
- if (opType == LSMOperationType.SEARCH || opType == LSMOperationType.NOOP || opType == LSMOperationType.FLUSH
- || opType == LSMOperationType.MERGE) {
+ if (opType == LSMOperationType.SEARCH || opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
completeOperation(index, opType, searchCallback, modificationCallback);
}
}
@@ -68,39 +60,42 @@
@Override
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
- int nActiveOps = numActiveOperations[index.getCurrentMutableComponentId()].decrementAndGet();
+ if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
+ numActiveOperations.decrementAndGet();
+ } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
+ }
- if (opType != LSMOperationType.FLUSH) {
- flushIfFull(nActiveOps);
+ if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
+ flushIfFull();
}
}
- private void flushIfFull(int componentId, int nActiveOps) throws HyracksDataException {
- // If we need a flush, and this is the last completing operation, then schedule the flush.
- if (datasetBufferCaches.get(componentId).isFull() && nActiveOps == 0) {
- Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
+ private void flushIfFull() throws HyracksDataException {
+ Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
+ // If we need a flush, and this is the last completing operation, then schedule the flush.
+ boolean needsFlush = false;
+ for (ILSMIndex lsmIndex : indexes) {
+ if (lsmIndex.getFlushStatus()) {
+ needsFlush = true;
+ break;
+ }
+ }
+ if (needsFlush && numActiveOperations.get() == 0) {
for (ILSMIndex lsmIndex : indexes) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
accessor.scheduleFlush(((BaseOperationTracker) lsmIndex.getOperationTracker()).getIOOperationCallback());
}
-
}
}
public void exclusiveJobCommitted() throws HyracksDataException {
- for (int i = 0; i < numActiveOperations.length; i++) {
- numActiveOperations[i].set(0);
- flushIfFull(i, 0);
- }
+ numActiveOperations.set(0);
+ flushIfFull();
}
- public boolean isActiveDataset() {
- for (int i = 0; i < numActiveOperations.length; i++) {
- if (numActiveOperations[i].get() > 0) {
- return false;
- }
- }
- return true;
+ public int getNumActiveOperations() {
+ return numActiveOperations.get();
}
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index fdc065e..31f3e64 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -68,5 +68,7 @@
}
public void setExclusiveJobLevelCommit();
+
+ public boolean isExlusiveJobLevelCommit();
}
diff --git a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
index 88c65ff..ca9b232 100644
--- a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
+++ b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
@@ -325,6 +325,7 @@
List<CompilationUnit> cUnits = testCaseCtx.getTestCase().getCompilationUnit();
for (CompilationUnit cUnit : cUnits) {
+ System.out.println(cUnit.getName());
testFileCtxs = testCaseCtx.getTestFiles(cUnit);
expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 82fee45..4f67cb5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -33,6 +33,7 @@
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
@@ -340,10 +341,10 @@
int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
LSMBTree lsmBtree = null;
long resourceID = -1;
- AsterixRuntimeComponentsProvider rtcProvider = index.isPrimaryIndex() ? AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER
- : AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER;
+ AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
- .getDatasetId().getId()) : new BaseOperationTracker(LSMBTreeIOOperationCallbackFactory.INSTANCE);
+ .getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, index.getDatasetId().getId());
if (create) {
lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
@@ -366,7 +367,7 @@
typeTraits, comparatorFactories, bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
opTracker, runtimeContext.getLSMIOScheduler(),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER);
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
indexLifecycleManager.register(resourceID, lsmBtree);
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 1ade7ef..a3a86bb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -33,21 +33,13 @@
import edu.uci.ics.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
-import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.data.operator.ExternalDataScanOperatorDescriptor;
-import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.external.data.operator.FeedMessageOperatorDescriptor;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
-import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
@@ -64,6 +56,13 @@
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
+import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.types.ARecordType;
@@ -81,6 +80,7 @@
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -569,14 +569,14 @@
primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
}
}
- AsterixRuntimeComponentsProvider rtcProvider = isSecondary ? AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER
- : AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER;
+ AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
- isSecondary ? AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER
+ isSecondary ? new SecondaryIndexOperationTrackerProvider(
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId())
: new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), rtcProvider,
rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
searchCallbackFactory);
@@ -644,10 +644,10 @@
typeTraits, comparatorFactories, keyFields, new LSMRTreeDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
nestedKeyType.getTypeTag(), comparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
@@ -802,10 +802,10 @@
splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, false, numElementsHint, true,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
@@ -870,10 +870,10 @@
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
@@ -1066,10 +1066,10 @@
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, storageProperties
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory,
false);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
@@ -1194,10 +1194,11 @@
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER, storageProperties
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
splitsAndConstraint.second);
@@ -1288,10 +1289,10 @@
comparatorFactories, null, fieldPermutation, indexOp, new LSMRTreeDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
nestedKeyType.getTypeTag(), comparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate()), filterFactory,
modificationCallbackFactory, false);
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
index 6cdaea0..0f836af 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
@@ -100,11 +100,11 @@
@Override
public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() {
- return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
@Override
public IStorageManagerInterface getStorageManagerInterface() {
- return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
index 3661537..aec378b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
@@ -14,7 +14,9 @@
*/
package edu.uci.ics.asterix.transaction.management.opcallbacks;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -24,15 +26,19 @@
private static final long serialVersionUID = 1L;
+ private final int datasetID;
private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
- public SecondaryIndexOperationTrackerProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+ public SecondaryIndexOperationTrackerProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
this.ioOpCallbackFactory = ioOpCallbackFactory;
+ this.datasetID = datasetID;
}
@Override
public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
- return new BaseOperationTracker(ioOpCallbackFactory);
+ DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
+ .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
+ return new BaseOperationTracker(dslcManager, ioOpCallbackFactory, datasetID);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index cd894a1..d243dd2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -18,6 +18,7 @@
import java.util.List;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -55,7 +56,8 @@
.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories,
bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
.getLSMMergePolicy(), isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID)
- : new BaseOperationTracker(LSMBTreeIOOperationCallbackFactory.INSTANCE), runtimeContextProvider
+ : new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
.getLSMIOScheduler(), runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(isPrimary));
return lsmBTree;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index b3da3ee..308f004 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -15,6 +15,7 @@
package edu.uci.ics.asterix.transaction.management.resource;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -61,7 +62,8 @@
tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
.getLSMMergePolicy(), new BaseOperationTracker(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE), runtimeContextProvider
+ (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
.getLSMIOScheduler(), runtimeContextProvider
.getLSMInvertedIndexIOOperationCallbackProvider());
} else {
@@ -70,7 +72,8 @@
tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
.getLSMMergePolicy(), new BaseOperationTracker(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE), runtimeContextProvider
+ (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
.getLSMIOScheduler(), runtimeContextProvider
.getLSMInvertedIndexIOOperationCallbackProvider());
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index 9b9faef..d64cf63 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -17,6 +17,7 @@
import java.io.File;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -64,8 +65,10 @@
runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
runtimeContextProvider.getLSMMergePolicy(), new BaseOperationTracker(
- LSMRTreeIOOperationCallbackFactory.INSTANCE), runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(), linearizeCmpFactory);
+ (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
+ .getLSMIOScheduler(), runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(),
+ linearizeCmpFactory);
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index a612eff..0bc9462 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -396,9 +396,11 @@
logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
if (logType != LogType.ENTITY_COMMIT) {
- if (logType == LogType.COMMIT) {
- map = activeTxnCountMaps.get(pageIndex);
- map.put(txnCtx, 1);
+ if (logType == LogType.COMMIT && txnCtx.isExlusiveJobLevelCommit()) {
+ synchronized (this) {
+ map = activeTxnCountMaps.get(pageIndex);
+ map.put(txnCtx, 1);
+ }
}
// release the ownership as the log record has been placed in
// created space.
@@ -409,13 +411,15 @@
}
if (logType == LogType.ENTITY_COMMIT) {
- map = activeTxnCountMaps.get(pageIndex);
- if (map.containsKey(txnCtx)) {
- activeTxnCount = (Integer) map.get(txnCtx);
- activeTxnCount++;
- map.put(txnCtx, activeTxnCount);
- } else {
- map.put(txnCtx, 1);
+ synchronized (this) {
+ map = activeTxnCountMaps.get(pageIndex);
+ if (map.containsKey(txnCtx)) {
+ activeTxnCount = (Integer) map.get(txnCtx);
+ activeTxnCount++;
+ map.put(txnCtx, activeTxnCount);
+ } else {
+ map.put(txnCtx, 1);
+ }
}
// ------------------------------------------------------------------------------
// [Notice]
@@ -712,7 +716,7 @@
static AtomicInteger t = new AtomicInteger();
- public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
+ public synchronized void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
ITransactionContext ctx = null;
int count = 0;
int i = 0;
@@ -936,12 +940,11 @@
diskNextWriteOffset, flushPageIndex);
resetFlushPageIndex = true;
}
-
- // decrement activeTxnCountOnIndexes
- logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
} finally {
logManager.getLogPage(flushPageIndex).releaseWriteLatch();
}
+ // decrement activeTxnCountOnIndexes
+ logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
// #. checks the queue whether there is another flush
// request on the same log buffer
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 97f2477..59a8363 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -16,22 +16,16 @@
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -39,33 +33,13 @@
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
- ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider, ILSMOperationTrackerProvider,
+ ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider,
ILSMIOOperationCallbackProvider {
private static final long serialVersionUID = 1L;
- private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
- private final boolean isSecondary;
-
- public static final AsterixRuntimeComponentsProvider LSMBTREE_PRIMARY_PROVIDER = new AsterixRuntimeComponentsProvider(
- LSMBTreeIOOperationCallbackFactory.INSTANCE, false);
- public static final AsterixRuntimeComponentsProvider LSMBTREE_SECONDARY_PROVIDER = new AsterixRuntimeComponentsProvider(
- LSMBTreeIOOperationCallbackFactory.INSTANCE, true);
- public static final AsterixRuntimeComponentsProvider LSMRTREE_PROVIDER = new AsterixRuntimeComponentsProvider(
- LSMRTreeIOOperationCallbackFactory.INSTANCE, true);
- public static final AsterixRuntimeComponentsProvider LSMINVERTEDINDEX_PROVIDER = new AsterixRuntimeComponentsProvider(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, true);
- public static final AsterixRuntimeComponentsProvider NOINDEX_PROVIDER = new AsterixRuntimeComponentsProvider(null,
- false);
-
- private AsterixRuntimeComponentsProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean isSecondary) {
- this.ioOpCallbackFactory = ioOpCallbackFactory;
- this.isSecondary = isSecondary;
- }
-
- @Override
- public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
- assert isSecondary;
- return new BaseOperationTracker(ioOpCallbackFactory);
+ public static final AsterixRuntimeComponentsProvider RUNTIME_PROVIDER = new AsterixRuntimeComponentsProvider();
+
+ private AsterixRuntimeComponentsProvider() {
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index afc09da..432743a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -190,6 +190,11 @@
exlusiveJobLevelCommit = true;
}
+ @Override
+ public boolean isExlusiveJobLevelCommit() {
+ return exlusiveJobLevelCommit;
+ }
+
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("\n" + jobId + "\n");