Merge branch 'master' into salsubaiee/master_lsm
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 23f1d56..56fc0e7 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -20,6 +20,7 @@
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
@@ -38,6 +39,7 @@
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -217,18 +219,18 @@
if (!isPartitioned) {
dataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate());
} else {
dataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate());
}
LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index e56ed92..18a81e5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -15,6 +15,7 @@
package edu.uci.ics.asterix.api.common;
import java.io.IOException;
+import java.util.List;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
@@ -45,7 +46,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
@@ -107,7 +108,8 @@
storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages(),
storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
- lsmIOScheduler = SynchronousScheduler.INSTANCE;
+ AsynchronousScheduler.INSTANCE.init(ncApplicationContext.getThreadFactory());
+ lsmIOScheduler = AsynchronousScheduler.INSTANCE;
mergePolicy = new ConstantMergePolicy(storageProperties.getLSMIndexMergeThreshold(), this);
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
@@ -212,8 +214,8 @@
}
@Override
- public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
- return indexLifecycleManager.getVirtualBufferCache(datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
+ return indexLifecycleManager.getVirtualBufferCaches(datasetID);
}
@Override
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index 712d993..ee9dfae 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.asterix.api.common;
+import java.util.List;
+
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -88,32 +90,28 @@
}
@Override
- public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
- return asterixAppRuntimeContext.getVirtualBufferCache(datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
+ return asterixAppRuntimeContext.getVirtualBufferCaches(datasetID);
}
@Override
public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
@Override
public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary) {
- if (isPrimary) {
- return AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER;
- } else {
- return AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER;
- }
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
@Override
public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
@Override
public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
@Override
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index a0872d2..9af6c4c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -123,12 +123,12 @@
datasetName);
AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()));
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
splitsAndConstraint.second);
@@ -175,13 +175,12 @@
localResourceMetadata, LocalResource.LSMBTreeResource);
TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
@@ -260,15 +259,15 @@
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad;
if (!loadStmt.alreadySorted()) {
btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, splitsAndConstraint.first, typeTraits,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, splitsAndConstraint.first, typeTraits,
comparatorFactories, blooFilterKeyFields, fieldPermutation, GlobalConfig.DEFAULT_BTREE_FILL_FACTOR,
true, numElementsHint, true, new LSMBTreeDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
splitsAndConstraint.second);
@@ -286,15 +285,15 @@
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
} else {
btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, splitsAndConstraint.first, typeTraits,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, splitsAndConstraint.first, typeTraits,
comparatorFactories, blooFilterKeyFields, fieldPermutation, GlobalConfig.DEFAULT_BTREE_FILL_FACTOR,
false, numElementsHint, true, new LSMBTreeDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index 7bd6c69..af56894 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -18,10 +18,12 @@
import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
@@ -66,12 +68,12 @@
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName, indexName);
AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
+ .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()));
AlgebricksPartitionConstraintHelper
.setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index 052368a..e3832d4 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -19,6 +19,8 @@
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -60,13 +62,13 @@
localResourceMetadata, LocalResource.LSMBTreeResource);
TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
secondaryBloomFilterKeyFields, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
+ .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
@@ -104,10 +106,10 @@
spec,
numSecondaryKeys,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, storageProperties
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 6717b4d..f716387 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -23,7 +23,11 @@
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider;
+import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
+import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -39,8 +43,11 @@
import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.IsNullDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
+import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -63,6 +70,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
@@ -71,6 +79,7 @@
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -269,18 +278,26 @@
int[] lowKeyFields = null;
// +Infinity
int[] highKeyFields = null;
+ ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ JobId jobId = JobIdFactory.generateJobId();
+ metadataProvider.setJobId(jobId);
+ boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+ IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction);
+ spec.setJobletEventListenerFactory(jobEventListenerFactory);
+
+ ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(
+ jobId, dataset.getDatasetId(), primaryBloomFilterKeyFields, txnSubsystemProvider,
+ ResourceType.LSM_BTREE);
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
- .getBloomFilterFalsePositiveRate()), false,
- NoOpOperationCallbackFactory.INSTANCE);
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ storageProperties.getBloomFilterFalsePositiveRate()), false, searchCallbackFactory);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
primaryPartitionConstraint);
return primarySearchOp;
@@ -333,11 +350,10 @@
fieldPermutation[i] = i;
}
TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, secondaryFileSplitProvider,
- secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields,
- fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
- NoOpOperationCallbackFactory.INSTANCE);
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
+ secondaryBloomFilterKeyFields, fieldPermutation, fillFactor, false, numElementsHint, false,
+ dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
secondaryPartitionConstraint);
return treeIndexBulkLoadOp;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 17590c5..40e0aa9 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -22,11 +22,13 @@
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.asterix.runtime.formats.FormatUtils;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -176,8 +178,8 @@
IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, secondaryFileSplitProvider,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory,
localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp,
@@ -267,8 +269,8 @@
IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
spec, fieldPermutation, false, numElementsHint, false,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, secondaryFileSplitProvider,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp,
@@ -280,17 +282,17 @@
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
if (!isPartitioned) {
return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate());
} else {
return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate());
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 17632aa..ec62068 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -21,6 +21,7 @@
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -30,6 +31,7 @@
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -84,14 +86,14 @@
localResourceMetadata, LocalResource.LSMRTreeResource);
TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, null,
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
keyType, secondaryComparatorFactories.length), storageProperties
.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
@@ -174,10 +176,10 @@
numNestedSecondaryKeyFields,
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
keyType, secondaryComparatorFactories.length), storageProperties
.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/insert-and-scan-dataset-with-index/insert-and-scan-dataset-with-index.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/dml/insert-and-scan-dataset-with-index/insert-and-scan-dataset-with-index.2.update.aql
index cf7b309..29f6242 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/insert-and-scan-dataset-with-index/insert-and-scan-dataset-with-index.2.update.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/insert-and-scan-dataset-with-index/insert-and-scan-dataset-with-index.2.update.aql
@@ -8,8 +8,6 @@
use dataverse test;
-use dataverse test;
-
load dataset test.employee
using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
(("path"="nc1://data/names.adm"),("format"="delimited-text"),("delimiter"="|"));
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
index d035303..4287212 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -15,6 +15,7 @@
package edu.uci.ics.asterix.common.api;
import java.io.IOException;
+import java.util.List;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -62,5 +63,5 @@
public double getBloomFilterFalsePositiveRate();
- public IVirtualBufferCache getVirtualBufferCache(int datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
index 85f90e4..d7ff15d 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
@@ -31,6 +31,9 @@
private static final String STORAGE_MEMORYCOMPONENT_NUMPAGES_KEY = "storage.memorycomponent.numpages";
private static final int STORAGE_MEMORYCOMPONENT_NUMPAGES_DEFAULT = 1024; // ... so 32MB components
+ private static final String STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS_KEY = "storage.memorycomponent.numcomponents";
+ private static final int STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS_DEFAULT = 2; // 2 components
+
private static final String STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_KEY = "storage.memorycomponent.globalbudget";
private static final long STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_DEFAULT = 536870912; // 512MB
@@ -53,7 +56,7 @@
return accessor.getProperty(STORAGE_BUFFERCACHE_SIZE_KEY, STORAGE_BUFFERCACHE_SIZE_DEFAULT,
PropertyInterpreters.getLongPropertyInterpreter());
}
-
+
public int getBufferCacheNumPages() {
return (int) (getBufferCacheSize() / getBufferCachePageSize());
}
@@ -73,6 +76,11 @@
PropertyInterpreters.getIntegerPropertyInterpreter());
}
+ public int getMemoryComponentsNum() {
+ return accessor.getProperty(STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS_KEY,
+ STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS_DEFAULT, PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
public long getMemoryComponentGlobalBudget() {
return accessor.getProperty(STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_KEY,
STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_DEFAULT, PropertyInterpreters.getLongPropertyInterpreter());
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixVirtualBufferCacheProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixVirtualBufferCacheProvider.java
index bd2828d..9efc9fd 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixVirtualBufferCacheProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixVirtualBufferCacheProvider.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.asterix.common.context;
+import java.util.List;
+
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -29,9 +31,9 @@
}
@Override
- public IVirtualBufferCache getVirtualBufferCache(IHyracksTaskContext ctx) {
+ public List<IVirtualBufferCache> getVirtualBufferCaches(IHyracksTaskContext ctx) {
return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
- .getVirtualBufferCache(datasetID);
+ .getVirtualBufferCaches(datasetID);
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
index c48115c..3610478 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
@@ -26,13 +26,18 @@
public class BaseOperationTracker implements ILSMOperationTracker {
+ protected final DatasetLifecycleManager datasetLifecycleManager;
protected final ILSMIOOperationCallback ioOpCallback;
protected long lastLSN;
protected long firstLSN;
+ protected final int datasetID;
- public BaseOperationTracker(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+ public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
+ this.datasetLifecycleManager = datasetLifecycleManager;
this.ioOpCallback = ioOpCallbackFactory == null ? NoOpIOOperationCallback.INSTANCE : ioOpCallbackFactory
.createIOOperationCallback(this);
+ this.datasetID = datasetID;
resetLSNs();
}
@@ -63,11 +68,17 @@
@Override
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ datasetLifecycleManager.declareActiveIOOperation(datasetID);
+ }
}
@Override
public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
+ }
}
@Override
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
index 3ffa73d..cf69bfe 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
@@ -15,10 +15,13 @@
package edu.uci.ics.asterix.common.context;
+import java.util.List;
+
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -34,9 +37,10 @@
this.ctx = ctx;
}
- public void diskComponentAdded(final ILSMIndex index, int totalNumDiskComponents) throws HyracksDataException,
- IndexException {
- if (!ctx.isShuttingdown() && totalNumDiskComponents >= threshold) {
+ @Override
+ public void diskComponentAdded(final ILSMIndex index) throws HyracksDataException, IndexException {
+ List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+ if (!ctx.isShuttingdown() && immutableComponents.size() >= threshold) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 7232f01..eea3d99 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -31,7 +31,9 @@
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
@@ -42,7 +44,7 @@
public class DatasetLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent {
private final AsterixStorageProperties storageProperties;
- private final Map<Integer, MultitenantVirtualBufferCache> datasetVirtualBufferCaches;
+ private final Map<Integer, List<IVirtualBufferCache>> datasetVirtualBufferCaches;
private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
private final Map<Integer, DatasetInfo> datasetInfos;
private final ILocalResourceRepository resourceRepository;
@@ -53,7 +55,7 @@
ILocalResourceRepository resourceRepository) {
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
- datasetVirtualBufferCaches = new HashMap<Integer, MultitenantVirtualBufferCache>();
+ datasetVirtualBufferCaches = new HashMap<Integer, List<IVirtualBufferCache>>();
datasetOpTrackers = new HashMap<Integer, ILSMOperationTracker>();
datasetInfos = new HashMap<Integer, DatasetInfo>();
capacity = storageProperties.getMemoryComponentGlobalBudget();
@@ -98,29 +100,77 @@
public synchronized void unregister(long resourceID) throws HyracksDataException {
int did = getDIDfromRID(resourceID);
DatasetInfo dsInfo = datasetInfos.get(did);
- IndexInfo iInfo = dsInfo.indexes.remove(resourceID);
+ IndexInfo iInfo = dsInfo.indexes.get(resourceID);
+
if (dsInfo == null || iInfo == null) {
throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
}
- if (iInfo.referenceCount != 0) {
- dsInfo.indexes.put(resourceID, iInfo);
+ PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers.get(dsInfo.datasetID);
+ if (iInfo.referenceCount != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
throw new HyracksDataException("Cannot remove index while it is open.");
}
- if (iInfo.isOpen) {
- iInfo.index.deactivate(true);
+ // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+
+ // First wait for any ongoing IO operations
+ while (dsInfo.numActiveIOOps > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
}
+ if (iInfo.isOpen) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(((BaseOperationTracker) iInfo.index.getOperationTracker()).getIOOperationCallback());
+ }
+
+ // Then wait for above flush op.
+ // They are separated so they don't deadlock each other.
+ while (dsInfo.numActiveIOOps > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ if (iInfo.isOpen) {
+ iInfo.index.deactivate(false);
+ }
+
+ dsInfo.indexes.remove(resourceID);
if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty()) {
- IVirtualBufferCache vbc = getVirtualBufferCache(did);
- assert vbc != null;
- used -= (vbc.getNumPages() * vbc.getPageSize());
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
+ assert vbcs != null;
+ for (IVirtualBufferCache vbc : vbcs) {
+ used -= (vbc.getNumPages() * vbc.getPageSize());
+ }
datasetInfos.remove(did);
}
}
+ public synchronized void declareActiveIOOperation(int datasetID) throws HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetID);
+ if (dsInfo == null) {
+ throw new HyracksDataException("Failed to find a dataset with ID " + datasetID);
+ }
+ dsInfo.incrementActiveIOOps();
+ }
+
+ public synchronized void undeclareActiveIOOperation(int datasetID) throws HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetID);
+ if (dsInfo == null) {
+ throw new HyracksDataException("Failed to find a dataset with ID " + datasetID);
+ }
+ dsInfo.decrementActiveIOOps();
+ notifyAll();
+ }
+
@Override
public synchronized void open(long resourceID) throws HyracksDataException {
int did = getDIDfromRID(resourceID);
@@ -137,9 +187,12 @@
}
if (!dsInfo.isOpen) {
- IVirtualBufferCache vbc = getVirtualBufferCache(did);
- assert vbc != null;
- long additionalSize = vbc.getNumPages() * vbc.getPageSize();
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
+ assert vbcs != null;
+ long additionalSize = 0;
+ for (IVirtualBufferCache vbc : vbcs) {
+ additionalSize += vbc.getNumPages() * vbc.getPageSize();
+ }
while (used + additionalSize > capacity) {
if (!evictCandidateDataset()) {
throw new HyracksDataException("Cannot activate index since memory budget would be exceeded.");
@@ -165,23 +218,54 @@
List<DatasetInfo> datasetInfosList = new ArrayList<DatasetInfo>(datasetInfos.values());
Collections.sort(datasetInfosList);
for (DatasetInfo dsInfo : datasetInfosList) {
- ILSMOperationTracker opTracker = datasetOpTrackers.get(dsInfo.datasetID);
- if (opTracker != null && ((PrimaryIndexOperationTracker) opTracker).getNumActiveOperations() == 0
- && dsInfo.referenceCount == 0 && dsInfo.isOpen) {
+ PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers
+ .get(dsInfo.datasetID);
+ if (opTracker != null && opTracker.getNumActiveOperations() == 0 && dsInfo.referenceCount == 0
+ && dsInfo.isOpen) {
+
+ // First wait for any ongoing IO operations
+ while (dsInfo.numActiveIOOps > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
for (IndexInfo iInfo : dsInfo.indexes.values()) {
if (iInfo.isOpen) {
- iInfo.index.deactivate(true);
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(((BaseOperationTracker) iInfo.index.getOperationTracker())
+ .getIOOperationCallback());
+ }
+ }
+ // Wait for the above flush ops.
+ while (dsInfo.numActiveIOOps > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ for (IndexInfo iInfo : dsInfo.indexes.values()) {
+ if (iInfo.isOpen) {
+ iInfo.index.deactivate(false);
iInfo.isOpen = false;
}
assert iInfo.referenceCount == 0;
}
-
- IVirtualBufferCache vbc = getVirtualBufferCache(dsInfo.datasetID);
- used -= vbc.getNumPages() * vbc.getPageSize();
dsInfo.isOpen = false;
+
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
+ for (IVirtualBufferCache vbc : vbcs) {
+ used -= vbc.getNumPages() * vbc.getPageSize();
+ }
return true;
+
}
}
+
return false;
}
@@ -213,15 +297,22 @@
return openIndexes;
}
- public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
synchronized (datasetVirtualBufferCaches) {
- MultitenantVirtualBufferCache vbc = datasetVirtualBufferCaches.get(datasetID);
- if (vbc == null) {
- vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache(new HeapBufferAllocator(),
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages()));
- datasetVirtualBufferCaches.put(datasetID, vbc);
+ List<IVirtualBufferCache> vbcs = datasetVirtualBufferCaches.get(datasetID);
+ if (vbcs == null) {
+ vbcs = new ArrayList<IVirtualBufferCache>();
+ for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
+ MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
+ new VirtualBufferCache(new HeapBufferAllocator(),
+ storageProperties.getMemoryComponentPageSize(),
+ storageProperties.getMemoryComponentNumPages()
+ / storageProperties.getMemoryComponentsNum()));
+ vbcs.add(vbc);
+ }
+ datasetVirtualBufferCaches.put(datasetID, vbcs);
}
- return vbc;
+ return vbcs;
}
}
@@ -245,7 +336,9 @@
}
Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>();
for (IndexInfo iInfo : dsInfo.indexes.values()) {
- datasetIndexes.add(iInfo.index);
+ if (iInfo.isOpen) {
+ datasetIndexes.add(iInfo.index);
+ }
}
return datasetIndexes;
}
@@ -280,6 +373,7 @@
private final Map<Long, IndexInfo> indexes;
private final int datasetID;
private long lastAccess;
+ private int numActiveIOOps;
public DatasetInfo(int datasetID) {
this.indexes = new HashMap<Long, IndexInfo>();
@@ -297,6 +391,14 @@
lastAccess = System.currentTimeMillis();
}
+ public void incrementActiveIOOps() {
+ numActiveIOOps++;
+ }
+
+ public void decrementActiveIOOps() {
+ numActiveIOOps--;
+ }
+
@Override
public int compareTo(DatasetInfo i) {
// sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
@@ -376,4 +478,4 @@
}
outputStream.write(sb.toString().getBytes());
}
-}
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 53b9192..69dba5e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -18,7 +18,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
@@ -26,35 +25,27 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
public class PrimaryIndexOperationTracker extends BaseOperationTracker {
- private final DatasetLifecycleManager datasetLifecycleManager;
- private final IVirtualBufferCache datasetBufferCache;
- private final int datasetID;
// Number of active operations on a ILSMIndex instance.
private AtomicInteger numActiveOperations;
public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
- super(ioOpCallbackFactory);
- this.datasetLifecycleManager = datasetLifecycleManager;
- this.numActiveOperations = new AtomicInteger(0);
- this.datasetID = datasetID;
- datasetBufferCache = datasetLifecycleManager.getVirtualBufferCache(datasetID);
+ super(datasetLifecycleManager, ioOpCallbackFactory, datasetID);
+ this.numActiveOperations = new AtomicInteger();
}
@Override
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
- numActiveOperations.incrementAndGet();
-
- // Increment transactor-local active operations count.
- AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
- if (opCallback != null) {
- opCallback.incrementLocalNumActiveOperations();
+ if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
+ numActiveOperations.incrementAndGet();
+ } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ datasetLifecycleManager.declareActiveIOOperation(datasetID);
}
}
@@ -62,8 +53,7 @@
public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
// Searches are immediately considered complete, because they should not prevent the execution of flushes.
- if ((searchCallback != null && searchCallback != NoOpOperationCallback.INSTANCE)
- || opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ if (opType == LSMOperationType.SEARCH || opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
completeOperation(index, opType, searchCallback, modificationCallback);
}
}
@@ -71,50 +61,45 @@
@Override
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
- int nActiveOps = numActiveOperations.decrementAndGet();
- // Decrement transactor-local active operations count.
- AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
- if (opCallback != null) {
- opCallback.decrementLocalNumActiveOperations();
+ if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
+ numActiveOperations.decrementAndGet();
+ } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
}
- if (opType != LSMOperationType.FLUSH) {
- flushIfFull(nActiveOps);
+
+ if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
+ flushIfFull();
}
}
- private void flushIfFull(int nActiveOps) throws HyracksDataException {
- // If we need a flush, and this is the last completing operation, then schedule the flush.
- if (datasetBufferCache.isFull() && nActiveOps == 0) {
- Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
- for (ILSMIndex lsmIndex : indexes) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
- NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(((BaseOperationTracker) lsmIndex.getOperationTracker()).getIOOperationCallback());
+ private void flushIfFull() throws HyracksDataException {
+ Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
+ // If we need a flush, and this is the last completing operation, then schedule the flush.
+ boolean needsFlush = false;
+ for (ILSMIndex lsmIndex : indexes) {
+ if (((ILSMIndexInternal)lsmIndex).hasFlushRequestForCurrentMutableComponent()) {
+ needsFlush = true;
+ break;
}
-
+ }
+ synchronized (this) {
+ if (needsFlush && numActiveOperations.get() == 0) {
+ for (ILSMIndex lsmIndex : indexes) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(((BaseOperationTracker) lsmIndex.getOperationTracker())
+ .getIOOperationCallback());
+ }
+ }
}
}
public void exclusiveJobCommitted() throws HyracksDataException {
numActiveOperations.set(0);
- flushIfFull(0);
- }
-
- private AbstractOperationCallback getOperationCallback(ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) {
-
- if (modificationCallback == NoOpOperationCallback.INSTANCE || modificationCallback == null) {
- return null;
- }
- if (searchCallback != null && searchCallback != NoOpOperationCallback.INSTANCE) {
- return (AbstractOperationCallback) searchCallback;
- } else {
- return (AbstractOperationCallback) modificationCallback;
- }
+ flushIfFull();
}
public int getNumActiveOperations() {
return numActiveOperations.get();
}
-
-}
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 129a1a7..bc19543 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -47,7 +47,6 @@
if (tupleFilter != null) {
frameTuple.reset(accessor, i);
if (!tupleFilter.accept(frameTuple)) {
- lsmAccessor.noOp();
continue;
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 40377f4..b136e95 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -20,7 +20,7 @@
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeImmutableComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -33,7 +33,7 @@
public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
if (oldComponents != null && newComponent != null) {
- LSMBTreeImmutableComponent btreeComponent = (LSMBTreeImmutableComponent) newComponent;
+ LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) newComponent;
putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents);
}
}
@@ -47,7 +47,7 @@
// Get max LSN from the oldComponents. Implies a merge IO operation.
long maxLSN = -1;
for (ILSMComponent c : oldComponents) {
- BTree btree = ((LSMBTreeImmutableComponent) c).getBTree();
+ BTree btree = ((LSMBTreeDiskComponent) c).getBTree();
maxLSN = Math.max(getTreeIndexLSN(btree), maxLSN);
}
return maxLSN;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index eb9878c..655ace2 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -20,7 +20,7 @@
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexImmutableComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -32,7 +32,7 @@
public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
if (oldComponents != null && newComponent != null) {
- LSMInvertedIndexImmutableComponent invIndexComponent = (LSMInvertedIndexImmutableComponent) newComponent;
+ LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) newComponent;
putLSNIntoMetadata(invIndexComponent.getDeletedKeysBTree(), oldComponents);
}
}
@@ -46,7 +46,7 @@
// Get max LSN from the oldComponents. Implies a merge IO operation.
long maxLSN = -1;
for (Object o : oldComponents) {
- LSMInvertedIndexImmutableComponent invIndexComponent = (LSMInvertedIndexImmutableComponent) o;
+ LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) o;
maxLSN = Math.max(getTreeIndexLSN(invIndexComponent.getDeletedKeysBTree()), maxLSN);
}
return maxLSN;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 63016f1..a26bb19 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -20,7 +20,7 @@
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeImmutableComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -32,7 +32,7 @@
public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
if (oldComponents != null && newComponent != null) {
- LSMRTreeImmutableComponent rtreeComponent = (LSMRTreeImmutableComponent) newComponent;
+ LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) newComponent;
putLSNIntoMetadata(rtreeComponent.getRTree(), oldComponents);
putLSNIntoMetadata(rtreeComponent.getBTree(), oldComponents);
}
@@ -47,7 +47,7 @@
// Get max LSN from the oldComponents. Implies a merge IO operation.
long maxLSN = -1;
for (Object o : oldComponents) {
- LSMRTreeImmutableComponent rtreeComponent = (LSMRTreeImmutableComponent) o;
+ LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) o;
maxLSN = Math.max(getTreeIndexLSN(rtreeComponent.getRTree()), maxLSN);
}
return maxLSN;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
index c951826..d4b26f7 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
@@ -15,8 +15,6 @@
package edu.uci.ics.asterix.common.transactions;
-import java.util.concurrent.atomic.AtomicInteger;
-
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
@@ -28,7 +26,6 @@
protected final int[] primaryKeyFields;
protected final ITransactionContext txnCtx;
protected final ILockManager lockManager;
- protected final AtomicInteger transactorLocalNumActiveOperations;
protected final long[] longHashes;
public AbstractOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
@@ -37,7 +34,6 @@
this.primaryKeyFields = primaryKeyFields;
this.txnCtx = txnCtx;
this.lockManager = lockManager;
- this.transactorLocalNumActiveOperations = new AtomicInteger(0);
this.longHashes = new long[2];
}
@@ -45,16 +41,4 @@
MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
return Math.abs((int) longHashes[0]);
}
-
- public int getLocalNumActiveOperations() {
- return transactorLocalNumActiveOperations.get();
- }
-
- public void incrementLocalNumActiveOperations() {
- transactorLocalNumActiveOperations.incrementAndGet();
- }
-
- public void decrementLocalNumActiveOperations() {
- transactorLocalNumActiveOperations.decrementAndGet();
- }
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index 05ac025..2f522b9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.asterix.common.transactions;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
@@ -58,5 +60,5 @@
public IIOManager getIOManager();
- public IVirtualBufferCache getVirtualBufferCache(int datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java
index be41a6b..21a02ce 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java
@@ -17,7 +17,7 @@
import java.io.IOException;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
/**
* Provides API for failure recovery. Failure could be at application level and
@@ -69,5 +69,5 @@
*/
public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException;
- public void checkpoint(boolean isSharpCheckpoint) throws ACIDException;
+ public void checkpoint(boolean isSharpCheckpoint) throws ACIDException, HyracksDataException;
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index 3ebb963..31f3e64 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -27,8 +27,6 @@
public void decreaseActiveTransactionCountOnIndexes() throws HyracksDataException;
- public int getActiveOperationCountOnIndexes() throws HyracksDataException;
-
public LogicalLogLocator getFirstLogLocator();
public LogicalLogLocator getLastLogLocator();
@@ -70,5 +68,7 @@
}
public void setExclusiveJobLevelCommit();
+
+ public boolean isExlusiveJobLevelCommit();
}
diff --git a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
index 88c65ff..861cce5 100644
--- a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
+++ b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
@@ -325,6 +325,8 @@
List<CompilationUnit> cUnits = testCaseCtx.getTestCase().getCompilationUnit();
for (CompilationUnit cUnit : cUnits) {
+ LOGGER.info("Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/"
+ + cUnit.getName() + " ... ");
testFileCtxs = testCaseCtx.getTestFiles(cUnit);
expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
diff --git a/asterix-installer/src/main/resources/conf/asterix-configuration.xml b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
index 6b3133c..deeb5b0 100644
--- a/asterix-installer/src/main/resources/conf/asterix-configuration.xml
+++ b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
@@ -68,6 +68,14 @@
</description>
</property>
+ <property>
+ <name>storage.memorycomponent.numcomponents</name>
+ <value>2</value>
+ <description>The number of memory components to be used per lsm index.
+ (Default = 2)
+ </description>
+ </property>
+
<property>
<name>storage.memorycomponent.globalbudget</name>
<value>536870192</value>
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index b980337..f742a64 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -115,6 +115,7 @@
@Override
public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ txnCtx.setExclusiveJobLevelCommit();
transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index aa976f8..4f67cb5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -33,6 +33,7 @@
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
@@ -333,18 +334,19 @@
+ IndexFileNameUtil.prepareFileName(metadataStore + File.separator + index.getFileNameRelativePath(),
runtimeContext.getMetaDataIODeviceId());
FileReference file = new FileReference(new File(filePath));
- IVirtualBufferCache virtualBufferCache = runtimeContext.getVirtualBufferCache(index.getDatasetId().getId());
+ List<IVirtualBufferCache> virtualBufferCaches = runtimeContext.getVirtualBufferCaches(index.getDatasetId()
+ .getId());
ITypeTraits[] typeTraits = index.getTypeTraits();
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
LSMBTree lsmBtree = null;
long resourceID = -1;
- AsterixRuntimeComponentsProvider rtcProvider = index.isPrimaryIndex() ? AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER
- : AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER;
+ AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
- .getDatasetId().getId()) : new BaseOperationTracker(LSMBTreeIOOperationCallbackFactory.INSTANCE);
+ .getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, index.getDatasetId().getId());
if (create) {
- lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCache, file, bufferCache, fileMapProvider, typeTraits,
+ lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
runtimeContext.getLSMMergePolicy(), opTracker, runtimeContext.getLSMIOScheduler(), rtcProvider);
lsmBtree.create();
@@ -361,11 +363,11 @@
resourceID = localResourceRepository.getResourceByName(file.getFile().getPath()).getResourceId();
lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
if (lsmBtree == null) {
- lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCache, file, bufferCache, fileMapProvider,
+ lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
typeTraits, comparatorFactories, bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
opTracker, runtimeContext.getLSMIOScheduler(),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER);
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
indexLifecycleManager.register(resourceID, lsmBtree);
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 1ade7ef..a5b6523 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -33,6 +33,9 @@
import edu.uci.ics.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.JobId;
@@ -81,6 +84,7 @@
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -569,14 +573,14 @@
primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
}
}
- AsterixRuntimeComponentsProvider rtcProvider = isSecondary ? AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER
- : AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER;
+ AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
- isSecondary ? AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER
+ isSecondary ? new SecondaryIndexOperationTrackerProvider(
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId())
: new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), rtcProvider,
rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
searchCallbackFactory);
@@ -644,10 +648,10 @@
typeTraits, comparatorFactories, keyFields, new LSMRTreeDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
nestedKeyType.getTypeTag(), comparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
@@ -802,10 +806,10 @@
splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, false, numElementsHint, true,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
@@ -870,10 +874,10 @@
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
@@ -1066,10 +1070,10 @@
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, storageProperties
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory,
false);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
@@ -1194,10 +1198,11 @@
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER, storageProperties
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
splitsAndConstraint.second);
@@ -1288,10 +1293,10 @@
comparatorFactories, null, fieldPermutation, indexOp, new LSMRTreeDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
nestedKeyType.getTypeTag(), comparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate()), filterFactory,
modificationCallbackFactory, false);
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
index 6cdaea0..0f836af 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
@@ -100,11 +100,11 @@
@Override
public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() {
- return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
@Override
public IStorageManagerInterface getStorageManagerInterface() {
- return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
index 3661537..aec378b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
@@ -14,7 +14,9 @@
*/
package edu.uci.ics.asterix.transaction.management.opcallbacks;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -24,15 +26,19 @@
private static final long serialVersionUID = 1L;
+ private final int datasetID;
private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
- public SecondaryIndexOperationTrackerProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+ public SecondaryIndexOperationTrackerProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
this.ioOpCallbackFactory = ioOpCallbackFactory;
+ this.datasetID = datasetID;
}
@Override
public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
- return new BaseOperationTracker(ioOpCallbackFactory);
+ DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
+ .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
+ return new BaseOperationTracker(dslcManager, ioOpCallbackFactory, datasetID);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 8ce0174..d243dd2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -15,8 +15,10 @@
package edu.uci.ics.asterix.transaction.management.resource;
import java.io.File;
+import java.util.List;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -49,12 +51,13 @@
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) {
FileReference file = new FileReference(new File(filePath));
- IVirtualBufferCache virtualBufferCache = runtimeContextProvider.getVirtualBufferCache(datasetID);
- LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCache, file, runtimeContextProvider
+ List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
+ LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, runtimeContextProvider
.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories,
bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
.getLSMMergePolicy(), isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID)
- : new BaseOperationTracker(LSMBTreeIOOperationCallbackFactory.INSTANCE), runtimeContextProvider
+ : new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
.getLSMIOScheduler(), runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(isPrimary));
return lsmBTree;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index b3da3ee..8482172 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -14,7 +14,10 @@
*/
package edu.uci.ics.asterix.transaction.management.resource;
+import java.util.List;
+
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -53,24 +56,26 @@
@Override
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) throws HyracksDataException {
- IVirtualBufferCache virtualBufferCache = runtimeContextProvider.getVirtualBufferCache(datasetID);
+ List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
try {
if (isPartitioned) {
- return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCache, runtimeContextProvider
+ return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCaches, runtimeContextProvider
.getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
.getLSMMergePolicy(), new BaseOperationTracker(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE), runtimeContextProvider
+ (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
.getLSMIOScheduler(), runtimeContextProvider
.getLSMInvertedIndexIOOperationCallbackProvider());
} else {
- return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCache, runtimeContextProvider
+ return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCaches, runtimeContextProvider
.getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
.getLSMMergePolicy(), new BaseOperationTracker(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE), runtimeContextProvider
+ (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
.getLSMIOScheduler(), runtimeContextProvider
.getLSMInvertedIndexIOOperationCallbackProvider());
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index 9b9faef..bc1e889 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -15,8 +15,10 @@
package edu.uci.ics.asterix.transaction.management.resource;
import java.io.File;
+import java.util.List;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -58,14 +60,16 @@
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) throws HyracksDataException {
FileReference file = new FileReference(new File(filePath));
- IVirtualBufferCache virtualBufferCache = runtimeContextProvider.getVirtualBufferCache(datasetID);
+ List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
try {
- return LSMRTreeUtils.createLSMTree(virtualBufferCache, file, runtimeContextProvider.getBufferCache(),
+ return LSMRTreeUtils.createLSMTree(virtualBufferCaches, file, runtimeContextProvider.getBufferCache(),
runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
runtimeContextProvider.getLSMMergePolicy(), new BaseOperationTracker(
- LSMRTreeIOOperationCallbackFactory.INSTANCE), runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(), linearizeCmpFactory);
+ (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
+ .getLSMIOScheduler(), runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(),
+ linearizeCmpFactory);
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 5b10144..0bc9462 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -53,7 +53,7 @@
public class LogManager implements ILogManager, ILifeCycleComponent {
- public static final boolean IS_DEBUG_MODE = false;//true
+ public static final boolean IS_DEBUG_MODE = false;// true
private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
private final TransactionSubsystem provider;
private LogManagerProperties logManagerProperties;
@@ -396,10 +396,11 @@
logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
if (logType != LogType.ENTITY_COMMIT) {
- if (logType == LogType.COMMIT) {
- txnCtx.setExclusiveJobLevelCommit();
- map = activeTxnCountMaps.get(pageIndex);
- map.put(txnCtx, 1);
+ if (logType == LogType.COMMIT && txnCtx.isExlusiveJobLevelCommit()) {
+ synchronized (this) {
+ map = activeTxnCountMaps.get(pageIndex);
+ map.put(txnCtx, 1);
+ }
}
// release the ownership as the log record has been placed in
// created space.
@@ -410,19 +411,23 @@
}
if (logType == LogType.ENTITY_COMMIT) {
- map = activeTxnCountMaps.get(pageIndex);
- if (map.containsKey(txnCtx)) {
- activeTxnCount = (Integer) map.get(txnCtx);
- activeTxnCount++;
- map.put(txnCtx, activeTxnCount);
- } else {
- map.put(txnCtx, 1);
+ synchronized (this) {
+ map = activeTxnCountMaps.get(pageIndex);
+ if (map.containsKey(txnCtx)) {
+ activeTxnCount = (Integer) map.get(txnCtx);
+ activeTxnCount++;
+ map.put(txnCtx, activeTxnCount);
+ } else {
+ map.put(txnCtx, 1);
+ }
}
- //------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------------
// [Notice]
- // reference count should be decremented
- // after activeTxnCount is incremented, but before addFlushRequest() is called.
- //------------------------------------------------------------------------------
+ // reference count should be decremented
+ // after activeTxnCount is incremented, but before
+ // addFlushRequest() is called.
+ // ------------------------------------------------------------------------------
+
// release the ownership as the log record has been placed in
// created space.
logPages[pageIndex].decRefCnt();
@@ -443,7 +448,7 @@
System.out.println("--------------> LSN(" + currentLSN + ") is written");
}
- //collect statistics
+ // collect statistics
statLogSize += totalLogSize;
statLogCount++;
@@ -711,7 +716,7 @@
static AtomicInteger t = new AtomicInteger();
- public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
+ public synchronized void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
ITransactionContext ctx = null;
int count = 0;
int i = 0;
@@ -739,22 +744,22 @@
@Override
public void start() {
- //no op
+ // no op
}
@Override
public void stop(boolean dumpState, OutputStream os) {
if (dumpState) {
- //#. dump Configurable Variables
+ // #. dump Configurable Variables
dumpConfVars(os);
- //#. dump LSNInfo
+ // #. dump LSNInfo
dumpLSNInfo(os);
try {
os.flush();
} catch (IOException e) {
- //ignore
+ // ignore
}
}
}
@@ -767,7 +772,7 @@
sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
os.write(sb.toString().getBytes());
} catch (Exception e) {
- //ignore exception and continue dumping as much as possible.
+ // ignore exception and continue dumping as much as possible.
if (IS_DEBUG_MODE) {
e.printStackTrace();
}
@@ -784,7 +789,7 @@
sb.append("\n>>dump_end\t>>----- [LSNInfo] -----\n");
os.write(sb.toString().getBytes());
} catch (Exception e) {
- //ignore exception and continue dumping as much as possible.
+ // ignore exception and continue dumping as much as possible.
if (IS_DEBUG_MODE) {
e.printStackTrace();
}
@@ -893,8 +898,7 @@
}
continue;
}
-
- //if the log page is already full, don't wait.
+ // if the log page is already full, don't wait.
if (logManager.getLogPage(flushPageIndex).getBufferNextWriteOffset() < logPageSize
- logManager.getLogRecordHelper().getCommitLogSize()) {
// #. sleep for the groupCommitWaitTime
@@ -912,7 +916,8 @@
beforeFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
- // put the content to disk (the thread still has a lock on the log page)
+ // put the content to disk (the thread still has a lock
+ // on the log page)
logManager.getLogPage(flushPageIndex).flush();
afterFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
@@ -920,12 +925,14 @@
// increment the last flushed lsn
logManager.incrementLastFlushedLsn(afterFlushOffset - beforeFlushOffset);
- // increment currentLSN if currentLSN is less than flushLSN.
+ // increment currentLSN if currentLSN is less than
+ // flushLSN.
if (logManager.getLastFlushedLsn().get() + 1 > logManager.getCurrentLsn().get()) {
logManager.getCurrentLsn().set(logManager.getLastFlushedLsn().get() + 1);
}
- // Map the log page to a new region in the log file if the flushOffset reached the logPageSize
+ // Map the log page to a new region in the log file if
+ // the flushOffset reached the logPageSize
if (afterFlushOffset == logPageSize) {
long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex]
.getDiskNextWriteOffset() + logBufferSize;
@@ -933,13 +940,11 @@
diskNextWriteOffset, flushPageIndex);
resetFlushPageIndex = true;
}
-
- // decrement activeTxnCountOnIndexes
- logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
-
} finally {
logManager.getLogPage(flushPageIndex).releaseWriteLatch();
}
+ // decrement activeTxnCountOnIndexes
+ logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
// #. checks the queue whether there is another flush
// request on the same log buffer
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index a558969..6f6da4a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -19,6 +19,7 @@
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -56,7 +57,7 @@
try {
recoveryMgr.checkpoint(false);
lastMinMCTFirstLSN = currentMinMCTFirstLSN;
- } catch (ACIDException e) {
+ } catch (ACIDException | HyracksDataException e) {
throw new Error("failed to checkpoint", e);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index ad1db1f..a7d803e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -66,13 +66,13 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeImmutableComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexImmutableComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeImmutableComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
@@ -333,22 +333,21 @@
case ResourceType.LSM_BTREE:
for (ILSMComponent c : immutableDiskIndexList) {
- BTree btree = ((LSMBTreeImmutableComponent) c).getBTree();
+ BTree btree = ((LSMBTreeDiskComponent) c).getBTree();
maxDiskLastLSN = Math.max(getTreeIndexLSN(btree), maxDiskLastLSN);
}
break;
case ResourceType.LSM_RTREE:
for (ILSMComponent c : immutableDiskIndexList) {
- RTree rtree = ((LSMRTreeImmutableComponent) c).getRTree();
+ RTree rtree = ((LSMRTreeDiskComponent) c).getRTree();
maxDiskLastLSN = Math.max(getTreeIndexLSN(rtree), maxDiskLastLSN);
}
break;
case ResourceType.LSM_INVERTED_INDEX:
for (ILSMComponent c : immutableDiskIndexList) {
- BTree delKeyBtree = ((LSMInvertedIndexImmutableComponent) c)
- .getDeletedKeysBTree();
+ BTree delKeyBtree = ((LSMInvertedIndexDiskComponent) c).getDeletedKeysBTree();
maxDiskLastLSN = Math.max(getTreeIndexLSN(delKeyBtree), maxDiskLastLSN);
}
break;
@@ -430,7 +429,7 @@
}
@Override
- public synchronized void checkpoint(boolean isSharpCheckpoint) throws ACIDException {
+ public synchronized void checkpoint(boolean isSharpCheckpoint) throws ACIDException, HyracksDataException {
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting sharp checkpoint ... ");
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 97f2477..59a8363 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -16,22 +16,16 @@
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -39,33 +33,13 @@
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
- ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider, ILSMOperationTrackerProvider,
+ ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider,
ILSMIOOperationCallbackProvider {
private static final long serialVersionUID = 1L;
- private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
- private final boolean isSecondary;
-
- public static final AsterixRuntimeComponentsProvider LSMBTREE_PRIMARY_PROVIDER = new AsterixRuntimeComponentsProvider(
- LSMBTreeIOOperationCallbackFactory.INSTANCE, false);
- public static final AsterixRuntimeComponentsProvider LSMBTREE_SECONDARY_PROVIDER = new AsterixRuntimeComponentsProvider(
- LSMBTreeIOOperationCallbackFactory.INSTANCE, true);
- public static final AsterixRuntimeComponentsProvider LSMRTREE_PROVIDER = new AsterixRuntimeComponentsProvider(
- LSMRTreeIOOperationCallbackFactory.INSTANCE, true);
- public static final AsterixRuntimeComponentsProvider LSMINVERTEDINDEX_PROVIDER = new AsterixRuntimeComponentsProvider(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, true);
- public static final AsterixRuntimeComponentsProvider NOINDEX_PROVIDER = new AsterixRuntimeComponentsProvider(null,
- false);
-
- private AsterixRuntimeComponentsProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean isSecondary) {
- this.ioOpCallbackFactory = ioOpCallbackFactory;
- this.isSecondary = isSecondary;
- }
-
- @Override
- public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
- assert isSecondary;
- return new BaseOperationTracker(ioOpCallbackFactory);
+ public static final AsterixRuntimeComponentsProvider RUNTIME_PROVIDER = new AsterixRuntimeComponentsProvider();
+
+ private AsterixRuntimeComponentsProvider() {
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 33522e3..432743a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -105,6 +105,7 @@
IModificationOperationCallback modificationCallback = (IModificationOperationCallback) cbIt.next();
BaseOperationTracker opTracker = (BaseOperationTracker) trackerIt.next();
if (exlusiveJobLevelCommit) {
+ // For metadata transactions only
opTracker.exclusiveJobCommitted();
} else {
opTracker.completeOperation(null, LSMOperationType.MODIFICATION, null, modificationCallback);
@@ -113,19 +114,6 @@
}
}
- @Override
- public int getActiveOperationCountOnIndexes() throws HyracksDataException {
- synchronized (indexes) {
- int count = 0;
- Iterator<AbstractOperationCallback> cbIt = callbacks.iterator();
- while (cbIt.hasNext()) {
- IModificationOperationCallback modificationCallback = (IModificationOperationCallback) cbIt.next();
- count += ((AbstractOperationCallback) modificationCallback).getLocalNumActiveOperations();
- }
- return count;
- }
- }
-
public void setTransactionType(TransactionType transactionType) {
this.transactionType = transactionType;
}
@@ -202,6 +190,11 @@
exlusiveJobLevelCommit = true;
}
+ @Override
+ public boolean isExlusiveJobLevelCommit() {
+ return exlusiveJobLevelCommit;
+ }
+
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("\n" + jobId + "\n");