Merge branch 'master' into zheilbron/asterix_issue470
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index d17d77c..da2e838 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -4,6 +4,7 @@
import java.util.List;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -201,19 +202,19 @@
AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
if (!isPartitioned) {
dataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
storageProperties.getBloomFilterFalsePositiveRate());
} else {
dataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
storageProperties.getBloomFilterFalsePositiveRate());
}
LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index e64f68f..4ea338c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -13,14 +13,11 @@
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixFileMapManager;
import edu.uci.ics.asterix.common.context.ConstantMergePolicy;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTrackerFactory;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -29,10 +26,10 @@
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
@@ -50,9 +47,6 @@
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactoryProvider;
public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
- private static final int DEFAULT_BUFFER_CACHE_PAGE_SIZE = 32768;
- private static final int DEFAULT_LIFECYCLEMANAGER_MEMORY_BUDGET = 1024 * 1024 * 1024; // 1GB
- private static final int DEFAULT_MAX_OPEN_FILES = Integer.MAX_VALUE;
private static final int METADATA_IO_DEVICE_ID = 0;
private final INCApplicationContext ncApplicationContext;
@@ -63,15 +57,12 @@
private AsterixStorageProperties storageProperties;
private AsterixTransactionProperties txnProperties;
- private IIndexLifecycleManager indexLifecycleManager;
+ private DatasetLifecycleManager indexLifecycleManager;
private IFileMapManager fileMapManager;
private IBufferCache bufferCache;
private ITransactionSubsystem txnSubsystem;
private ILSMMergePolicy mergePolicy;
- private ILSMOperationTrackerFactory lsmBTreeOpTrackerFactory;
- private ILSMOperationTrackerFactory lsmRTreeOpTrackerFactory;
- private ILSMOperationTrackerFactory lsmInvertedIndexOpTrackerFactory;
private ILSMIOOperationScheduler lsmIOScheduler;
private ILocalResourceRepository localResourceRepository;
private ResourceIdFactory resourceIdFactory;
@@ -101,21 +92,15 @@
storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages(),
storageProperties.getBufferCacheMaxOpenFiles());
- indexLifecycleManager = new IndexLifecycleManager(storageProperties.getMemoryComponentGlobalBudget());
-
lsmIOScheduler = SynchronousScheduler.INSTANCE;
mergePolicy = new ConstantMergePolicy(storageProperties.getLSMIndexMergeThreshold(), this);
- lsmBTreeOpTrackerFactory = new IndexOperationTrackerFactory(LSMBTreeIOOperationCallbackFactory.INSTANCE);
- lsmRTreeOpTrackerFactory = new IndexOperationTrackerFactory(LSMRTreeIOOperationCallbackFactory.INSTANCE);
- lsmInvertedIndexOpTrackerFactory = new IndexOperationTrackerFactory(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE);
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
ioManager);
localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
.createRepository();
resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
-
+ indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository);
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProviderForRecovery(
this);
txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider);
@@ -161,18 +146,6 @@
return storageProperties.getBloomFilterFalsePositiveRate();
}
- public ILSMOperationTrackerFactory getLSMBTreeOperationTrackerFactory() {
- return lsmBTreeOpTrackerFactory;
- }
-
- public ILSMOperationTrackerFactory getLSMRTreeOperationTrackerFactory() {
- return lsmRTreeOpTrackerFactory;
- }
-
- public ILSMOperationTrackerFactory getLSMInvertedIndexOperationTrackerFactory() {
- return lsmInvertedIndexOpTrackerFactory;
- }
-
public ILSMIOOperationScheduler getLSMIOScheduler() {
return lsmIOScheduler;
}
@@ -217,4 +190,14 @@
public AsterixExternalProperties getExternalProperties() {
return externalProperties;
}
+
+ @Override
+ public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
+ return indexLifecycleManager.getVirtualBufferCache(datasetID);
+ }
+
+ @Override
+ public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
+ return indexLifecycleManager.getOperationTracker(datasetID);
+ }
}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index 8c73a63..1035f0c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -7,7 +7,8 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
@@ -72,18 +73,14 @@
}
@Override
- public ILSMOperationTrackerFactory getLSMBTreeOperationTrackerFactory() {
- return asterixAppRuntimeContext.getLSMBTreeOperationTrackerFactory();
+ public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
+ return asterixAppRuntimeContext.getVirtualBufferCache(datasetID);
}
@Override
- public ILSMOperationTrackerFactory getLSMRTreeOperationTrackerFactory() {
- return asterixAppRuntimeContext.getLSMRTreeOperationTrackerFactory();
- }
-
- @Override
- public ILSMOperationTrackerFactory getLSMInvertedIndexOperationTrackerFactory() {
- return asterixAppRuntimeContext.getLSMInvertedIndexOperationTrackerFactory();
+ public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider() {
+ // TODO Auto-generated method stub
+ return null;
}
@Override
@@ -105,9 +102,7 @@
}
@Override
- public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider() {
- // TODO Auto-generated method stub
- return null;
+ public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
+ return asterixAppRuntimeContext.getLSMBTreeOperationTracker(datasetID);
}
-
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 911f6fd..8ba7f9f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -547,12 +547,13 @@
String datasetName = null;
String indexName = null;
JobSpecification spec = null;
+ Dataset ds = null;
try {
CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
dataverseName = getActiveDataverseName(stmtCreateIndex.getDataverseName());
datasetName = stmtCreateIndex.getDatasetName().getValue();
- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
+ ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName);
if (ds == null) {
throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
@@ -638,7 +639,8 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
try {
- JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider);
+ JobSpecification jobSpec = IndexOperations
+ .buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
@@ -747,7 +749,8 @@
if (indexes.get(k).isSecondaryIndex()) {
CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
indexes.get(k).getIndexName());
- jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
+ datasets.get(j)));
}
}
@@ -859,7 +862,7 @@
if (indexes.get(j).isSecondaryIndex()) {
CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
indexes.get(j).getIndexName());
- jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
}
}
CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
@@ -965,7 +968,7 @@
}
//#. prepare a job to drop the index in NC.
CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
- jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
//#. mark PendingDropOp on the existing index
MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index e22c215..47aaac8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -22,10 +22,12 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.api.common.Job;
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.formats.base.IDataFormat;
@@ -40,7 +42,7 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
-import edu.uci.ics.asterix.transaction.management.resource.ILocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -123,12 +125,11 @@
AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
- splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
+ splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()));
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
splitsAndConstraint.second);
@@ -170,20 +171,19 @@
AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
//prepare a LocalResourceMetadata which will be stored in NC's local resource repository
ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
- comparatorFactories, blooFilterKeyFields, true, storageProperties.getMemoryComponentPageSize(),
- storageProperties.getMemoryComponentNumPages(), fs);
+ comparatorFactories, blooFilterKeyFields, true, fs, dataset.getDatasetId());
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMBTreeResource);
TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields,
- new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, storageProperties
- .getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- storageProperties.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
+ .getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
splitsAndConstraint.second);
@@ -269,11 +269,11 @@
AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields, fieldPermutation,
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, false, numElementsHint, new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index 90a1ef0..186056e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -2,9 +2,11 @@
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
@@ -39,12 +41,11 @@
}
public static JobSpecification buildDropSecondaryIndexJobSpec(CompiledIndexDropStatement indexDropStmt,
- AqlMetadataProvider metadataProvider) throws AlgebricksException, MetadataException {
+ AqlMetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException {
String dataverseName = indexDropStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
: indexDropStmt.getDataverseName();
String datasetName = indexDropStmt.getDatasetName();
String indexName = indexDropStmt.getIndexName();
-
JobSpecification spec = new JobSpecification();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
@@ -52,12 +53,11 @@
AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
- splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
+ splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()));
AlgebricksPartitionConstraintHelper
.setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index ebd1a9c..ec55d96 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -1,9 +1,10 @@
package edu.uci.ics.asterix.file;
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.transaction.management.resource.ILocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -39,21 +40,19 @@
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
//prepare a LocalResourceMetadata which will be stored in NC's local resource repository
ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(
- secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, false,
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- secondaryFileSplitProvider.getFileSplits());
+ secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, true,
+ secondaryFileSplitProvider.getFileSplits(), dataset.getDatasetId());
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMBTreeResource);
TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
- secondaryBloomFilterKeyFields, new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
+ secondaryBloomFilterKeyFields, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
@@ -90,12 +89,12 @@
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
spec,
numSecondaryKeys,
- new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, storageProperties
- .getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- storageProperties.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, storageProperties
+ .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index b79102b..46f5b1a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -22,6 +22,7 @@
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
@@ -39,6 +40,7 @@
import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.IsNullDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -279,12 +281,12 @@
AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
- new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, storageProperties
- .getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- storageProperties.getBloomFilterFalsePositiveRate()), false,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
+ .getBloomFilterFalsePositiveRate()), false,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
primaryPartitionConstraint);
@@ -338,10 +340,11 @@
fieldPermutation[i] = i;
}
TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
- secondaryBloomFilterKeyFields, fieldPermutation, fillFactor, false, numElementsHint,
- dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, secondaryFileSplitProvider,
+ secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields,
+ fieldPermutation, fillFactor, false, numElementsHint, dataflowHelperFactory,
+ NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
secondaryPartitionConstraint);
return treeIndexBulkLoadOp;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index ef94ec2..5e68053 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -2,16 +2,17 @@
import java.util.List;
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.asterix.runtime.formats.FormatUtils;
-import edu.uci.ics.asterix.transaction.management.resource.ILocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -152,12 +153,10 @@
public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
JobSpecification spec = new JobSpecification();
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
//prepare a LocalResourceMetadata which will be stored in NC's local resource repository
ILocalResourceMetadata localResourceMetadata = new LSMInvertedIndexLocalResourceMetadata(invListsTypeTraits,
- primaryComparatorFactories, tokenTypeTraits, tokenComparatorFactories, tokenizerFactory,
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- isPartitioned, secondaryFileSplitProvider.getFileSplits());
+ primaryComparatorFactories, tokenTypeTraits, tokenComparatorFactories, tokenizerFactory, isPartitioned,
+ secondaryFileSplitProvider.getFileSplits(), dataset.getDatasetId());
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMInvertedIndexResource);
@@ -265,20 +264,18 @@
private IIndexDataflowHelperFactory createDataflowHelperFactory() {
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
if (!isPartitioned) {
- return new LSMInvertedIndexDataflowHelperFactory(
+ return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
storageProperties.getBloomFilterFalsePositiveRate());
} else {
- return new PartitionedLSMInvertedIndexDataflowHelperFactory(
+ return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
storageProperties.getBloomFilterFalsePositiveRate());
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index b7e4886..b27de9f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -2,8 +2,10 @@
import java.util.List;
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
@@ -14,7 +16,6 @@
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
-import edu.uci.ics.asterix.transaction.management.resource.ILocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -64,8 +65,8 @@
ILocalResourceMetadata localResourceMetadata = new LSMRTreeLocalResourceMetadata(
secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, primaryComparatorFactories,
valueProviderFactories, RTreePolicyType.RTREE, AqlMetadataProvider.proposeLinearizer(keyType,
- secondaryComparatorFactories.length), storageProperties.getMemoryComponentPageSize(),
- storageProperties.getMemoryComponentNumPages(), secondaryFileSplitProvider.getFileSplits());
+ secondaryComparatorFactories.length), secondaryFileSplitProvider.getFileSplits(),
+ dataset.getDatasetId());
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMRTreeResource);
@@ -73,13 +74,13 @@
AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, null,
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
- primaryComparatorFactories, AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
keyType, secondaryComparatorFactories.length), storageProperties
- .getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- storageProperties.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
+ .getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
secondaryPartitionConstraint);
@@ -159,13 +160,13 @@
spec,
numNestedSecondaryKeyFields,
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
- primaryComparatorFactories, AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
keyType, secondaryComparatorFactories.length), storageProperties
- .getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- storageProperties.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+ .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index f609bf4..1d94da9 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -2113,6 +2113,11 @@
<output-dir compare="Text">float_01</output-dir>
</compilation-unit>
</test-case>
+ <!-- <test-case FilePath="misc">
+ <compilation-unit name="flushtest">
+ <output-dir compare="Text">flushtest</output-dir>
+ </compilation-unit>
+ </test-case> -->
<test-case FilePath="misc">
<compilation-unit name="groupby-orderby-count">
<output-dir compare="Text">groupby-orderby-count</output-dir>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
index e053989..964b761 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -9,7 +9,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
@@ -37,7 +38,7 @@
public ResourceIdFactory getResourceIdFactory();
- public ILSMOperationTrackerFactory getLSMBTreeOperationTrackerFactory();
+ public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
public void initialize() throws IOException, ACIDException, AsterixException;
@@ -47,4 +48,5 @@
public double getBloomFilterFalsePositiveRate();
+ public IVirtualBufferCache getVirtualBufferCache(int datasetID);
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixRuntimeComponentsProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixRuntimeComponentsProvider.java
index d03b15c..d7bc0f3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixRuntimeComponentsProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixRuntimeComponentsProvider.java
@@ -5,7 +5,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
@@ -23,11 +23,11 @@
public ILSMMergePolicy getLSMMergePolicy();
- public ILSMOperationTrackerFactory getLSMBTreeOperationTrackerFactory();
+ public ILSMOperationTrackerProvider getLSMBTreeOperationTrackerFactory();
- public ILSMOperationTrackerFactory getLSMRTreeOperationTrackerFactory();
+ public ILSMOperationTrackerProvider getLSMRTreeOperationTrackerFactory();
- public ILSMOperationTrackerFactory getLSMInvertedIndexOperationTrackerFactory();
+ public ILSMOperationTrackerProvider getLSMInvertedIndexOperationTrackerFactory();
public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ILocalResourceMetadata.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/ILocalResourceMetadata.java
similarity index 86%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ILocalResourceMetadata.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/api/ILocalResourceMetadata.java
index 3b32dc2..9fdfe5f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ILocalResourceMetadata.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/ILocalResourceMetadata.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.asterix.transaction.management.resource;
+package edu.uci.ics.asterix.common.api;
import java.io.Serializable;
@@ -10,5 +10,6 @@
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) throws HyracksDataException;
-
+
+ public int getDatasetID();
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
index cf38932..6dd2ced 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
@@ -8,7 +8,7 @@
private static int EXTERNAL_WEBPORT_DEFAULT = 19001;
private static final String EXTERNAL_LOGLEVEL_KEY = "log.level";
- private static Level EXTERNAL_LOGLEVEL_DEFAULT = Level.INFO;
+ private static Level EXTERNAL_LOGLEVEL_DEFAULT = Level.WARNING;
private static final String EXTERNAL_APISERVER_KEY = "api.port";
private static int EXTERNAL_APISERVER_DEFAULT = 19101;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
index 91bdca6..253b30c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
@@ -15,10 +15,10 @@
private static final int STORAGE_MEMORYCOMPONENT_PAGESIZE_DEFAULT = (32 << 10); // 32KB
private static final String STORAGE_MEMORYCOMPONENT_NUMPAGES_KEY = "storage.memorycomponent.numpages";
- private static final int STORAGE_MEMORYCOMPONENT_NUMPAGES_DEFAULT = 2048; // ... so 64MB components
+ private static final int STORAGE_MEMORYCOMPONENT_NUMPAGES_DEFAULT = 8; // ... so 32MB components
private static final String STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_KEY = "storage.memorycomponent.globalbudget";
- private static final long STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_DEFAULT = (1 << 30); // 1GB
+ private static final long STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_DEFAULT = 536870912; // 512MB
private static final String STORAGE_LSM_MERGETHRESHOLD_KEY = "storage.lsm.mergethreshold";
private static int STORAGE_LSM_MERGETHRESHOLD_DEFAULT = 3;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixVirtualBufferCacheProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixVirtualBufferCacheProvider.java
new file mode 100644
index 0000000..c046e42
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixVirtualBufferCacheProvider.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.asterix.common.context;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+
+public class AsterixVirtualBufferCacheProvider implements IVirtualBufferCacheProvider {
+
+ private static final long serialVersionUID = 1L;
+ private final int datasetID;
+
+ public AsterixVirtualBufferCacheProvider(int datasetID) {
+ this.datasetID = datasetID;
+ }
+
+ @Override
+ public IVirtualBufferCache getVirtualBufferCache(IHyracksTaskContext ctx) {
+ return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ .getVirtualBufferCache(datasetID);
+ }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
new file mode 100644
index 0000000..2e50bf6
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
@@ -0,0 +1,64 @@
+package edu.uci.ics.asterix.common.context;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+
+public class BaseOperationTracker implements ILSMOperationTracker {
+
+ protected final ILSMIOOperationCallback ioOpCallback;
+ protected long lastLSN;
+ protected long firstLSN;
+
+ public BaseOperationTracker(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+ this.ioOpCallback = ioOpCallbackFactory == null ? NoOpIOOperationCallback.INSTANCE : ioOpCallbackFactory
+ .createIOOperationCallback(this);
+ resetLSNs();
+ }
+
+ public ILSMIOOperationCallback getIOOperationCallback() {
+ return ioOpCallback;
+ }
+
+ public long getLastLSN() {
+ return lastLSN;
+ }
+
+ public long getFirstLSN() {
+ return firstLSN;
+ }
+
+ public void updateLastLSN(long lastLSN) {
+ if (firstLSN == -1) {
+ firstLSN = lastLSN;
+ }
+ this.lastLSN = Math.max(this.lastLSN, lastLSN);
+ }
+
+ public void resetLSNs() {
+ lastLSN = -1;
+ firstLSN = -1;
+ }
+
+ @Override
+ public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ }
+
+ @Override
+ public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ }
+
+ @Override
+ public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
new file mode 100644
index 0000000..25bfbd1
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -0,0 +1,317 @@
+package edu.uci.ics.asterix.common.context;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.LocalResource;
+
+public class DatasetLifecycleManager implements IIndexLifecycleManager {
+ private final AsterixStorageProperties storageProperties;
+ private final Map<Integer, MultitenantVirtualBufferCache> datasetVirtualBufferCaches;
+ private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
+ private final Map<Integer, DatasetInfo> datasetInfos;
+ private final ILocalResourceRepository resourceRepository;
+ private final long capacity;
+ private long used;
+
+ public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
+ ILocalResourceRepository resourceRepository) {
+ this.storageProperties = storageProperties;
+ this.resourceRepository = resourceRepository;
+ datasetVirtualBufferCaches = new HashMap<Integer, MultitenantVirtualBufferCache>();
+ datasetOpTrackers = new HashMap<Integer, ILSMOperationTracker>();
+ datasetInfos = new HashMap<Integer, DatasetInfo>();
+ capacity = storageProperties.getMemoryComponentGlobalBudget();
+ used = 0;
+ }
+
+ @Override
+ public synchronized IIndex getIndex(long resourceID) throws HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(getDIDfromRID(resourceID));
+ if (dsInfo == null) {
+ return null;
+ }
+ IndexInfo iInfo = dsInfo.indexes.get(resourceID);
+ if (iInfo == null) {
+ return null;
+ }
+ return iInfo.index;
+ }
+
+ @Override
+ public synchronized void register(long resourceID, IIndex index) throws HyracksDataException {
+ int did = getDIDfromRID(resourceID);
+ DatasetInfo dsInfo = datasetInfos.get(did);
+ if (dsInfo == null) {
+ dsInfo = new DatasetInfo(did);
+ } else if (dsInfo.indexes.containsKey(resourceID)) {
+ throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
+ }
+ datasetInfos.put(did, dsInfo);
+ dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index));
+ }
+
+ private int getDIDfromRID(long resourceID) throws HyracksDataException {
+ LocalResource lr = resourceRepository.getResourceById(resourceID);
+ if (lr == null) {
+ return -1;
+ }
+ return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
+ }
+
+ @Override
+ public synchronized void unregister(long resourceID) throws HyracksDataException {
+ int did = getDIDfromRID(resourceID);
+ DatasetInfo dsInfo = datasetInfos.get(did);
+ IndexInfo iInfo = dsInfo.indexes.remove(resourceID);
+ if (dsInfo == null || iInfo == null) {
+ throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
+ }
+
+ if (iInfo.referenceCount != 0) {
+ dsInfo.indexes.put(resourceID, iInfo);
+ throw new HyracksDataException("Cannot remove index while it is open.");
+ }
+
+ if (iInfo.isOpen) {
+ iInfo.index.deactivate(true);
+ }
+
+ if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty()) {
+ IVirtualBufferCache vbc = getVirtualBufferCache(did);
+ assert vbc != null;
+ used -= (vbc.getNumPages() * vbc.getPageSize());
+ datasetInfos.remove(did);
+ }
+
+ }
+
+ @Override
+ public synchronized void open(long resourceID) throws HyracksDataException {
+ int did = getDIDfromRID(resourceID);
+ DatasetInfo dsInfo = datasetInfos.get(did);
+ if (dsInfo == null) {
+ throw new HyracksDataException("Failed to open index with resource ID " + resourceID
+ + " since it does not exist.");
+ }
+
+ IndexInfo iInfo = dsInfo.indexes.get(resourceID);
+ if (iInfo == null) {
+ throw new HyracksDataException("Failed to open index with resource ID " + resourceID
+ + " since it does not exist.");
+ }
+
+ if (!dsInfo.isOpen) {
+ IVirtualBufferCache vbc = getVirtualBufferCache(did);
+ assert vbc != null;
+ long additionalSize = vbc.getNumPages() * vbc.getPageSize();
+ while (used + additionalSize > capacity) {
+ if (!evictCandidateDataset()) {
+ throw new HyracksDataException("Cannot activate index since memory budget would be exceeded.");
+ }
+ }
+ used += additionalSize;
+ }
+
+ dsInfo.isOpen = true;
+ dsInfo.touch();
+ if (!iInfo.isOpen) {
+ iInfo.index.activate();
+ iInfo.isOpen = true;
+ }
+ iInfo.touch();
+ }
+
+ private boolean evictCandidateDataset() throws HyracksDataException {
+ // Why min()? As a heuristic for eviction, we will take an open index (an index consuming memory)
+ // that is not being used (refcount == 0) and has been least recently used. The sort order defined
+ // for IndexInfo maintains this. See IndexInfo.compareTo().
+ DatasetInfo dsInfo = Collections.min(datasetInfos.values());
+ if (dsInfo.referenceCount == 0 && dsInfo.isOpen) {
+ for (IndexInfo iInfo : dsInfo.indexes.values()) {
+ if (iInfo.isOpen) {
+ iInfo.index.deactivate(true);
+ iInfo.isOpen = false;
+ }
+ assert iInfo.referenceCount == 0;
+ }
+
+ IVirtualBufferCache vbc = getVirtualBufferCache(dsInfo.datasetID);
+ used -= vbc.getNumPages() * vbc.getPageSize();
+ dsInfo.isOpen = false;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized void close(long resourceID) throws HyracksDataException {
+ int did = getDIDfromRID(resourceID);
+ DatasetInfo dsInfo = datasetInfos.get(did);
+ if (dsInfo == null) {
+ throw new HyracksDataException("No index found with resourceID " + resourceID);
+ }
+ IndexInfo iInfo = dsInfo.indexes.get(resourceID);
+ if (iInfo == null) {
+ throw new HyracksDataException("No index found with resourceID " + resourceID);
+ }
+ iInfo.untouch();
+ dsInfo.untouch();
+ }
+
+ @Override
+ public synchronized List<IIndex> getOpenIndexes() {
+ List<IIndex> openIndexes = new ArrayList<IIndex>();
+ for (DatasetInfo dsInfo : datasetInfos.values()) {
+ for (IndexInfo iInfo : dsInfo.indexes.values()) {
+ if (iInfo.isOpen) {
+ openIndexes.add(iInfo.index);
+ }
+ }
+ }
+ return openIndexes;
+ }
+
+ public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
+ synchronized (datasetVirtualBufferCaches) {
+ MultitenantVirtualBufferCache vbc = datasetVirtualBufferCaches.get(datasetID);
+ if (vbc == null) {
+ vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache(new HeapBufferAllocator(),
+ storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages()));
+ datasetVirtualBufferCaches.put(datasetID, vbc);
+ }
+ return vbc;
+ }
+ }
+
+ public ILSMOperationTracker getOperationTracker(int datasetID) {
+ synchronized (datasetOpTrackers) {
+ ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
+ if (opTracker == null) {
+ opTracker = new PrimaryIndexOperationTracker(this, datasetID,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE);
+ datasetOpTrackers.put(datasetID, opTracker);
+ }
+
+ return opTracker;
+ }
+ }
+
+ public synchronized Set<ILSMIndex> getDatasetIndexes(int datasetID) throws HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetID);
+ if (dsInfo == null) {
+ throw new HyracksDataException("No dataset found with datasetID " + datasetID);
+ }
+ Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>();
+ for (IndexInfo iInfo : dsInfo.indexes.values()) {
+ datasetIndexes.add(iInfo.index);
+ }
+ return datasetIndexes;
+ }
+
+ private static abstract class Info {
+ protected int referenceCount;
+ protected boolean isOpen;
+
+ public Info() {
+ referenceCount = 0;
+ isOpen = false;
+ }
+
+ public void touch() {
+ ++referenceCount;
+ }
+
+ public void untouch() {
+ --referenceCount;
+ }
+ }
+
+ private static class IndexInfo extends Info {
+ private ILSMIndex index;
+
+ public IndexInfo(ILSMIndex index) {
+ this.index = index;
+ }
+ }
+
+ private static class DatasetInfo extends Info implements Comparable<DatasetInfo> {
+ private final Map<Long, IndexInfo> indexes;
+ private final int datasetID;
+ private long lastAccess;
+
+ public DatasetInfo(int datasetID) {
+ this.indexes = new HashMap<Long, IndexInfo>();
+ this.lastAccess = -1;
+ this.datasetID = datasetID;
+ }
+
+ public void touch() {
+ super.touch();
+ lastAccess = System.currentTimeMillis();
+ }
+
+ public void untouch() {
+ super.untouch();
+ lastAccess = System.currentTimeMillis();
+ }
+
+ @Override
+ public int compareTo(DatasetInfo i) {
+ // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
+ //
+ // Example sort order:
+ // -------------------
+ // (F, 0, 70) <-- largest
+ // (F, 0, 60)
+ // (T, 10, 80)
+ // (T, 10, 70)
+ // (T, 9, 90)
+ // (T, 0, 100) <-- smallest
+ if (isOpen && !i.isOpen) {
+ return -1;
+ } else if (!isOpen && i.isOpen) {
+ return 1;
+ } else {
+ if (referenceCount < i.referenceCount) {
+ return -1;
+ } else if (referenceCount > i.referenceCount) {
+ return 1;
+ } else {
+ if (lastAccess < i.lastAccess) {
+ return -1;
+ } else if (lastAccess > i.lastAccess) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ }
+
+ }
+
+ public String toString() {
+ return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount
+ + ", lastAccess: " + lastAccess + "}";
+ }
+ }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
new file mode 100644
index 0000000..0d7dd93
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.common.context;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
+
+public class PrimaryIndexOperationTracker extends BaseOperationTracker {
+
+ private final DatasetLifecycleManager datasetLifecycleManager;
+ private final IVirtualBufferCache datasetBufferCache;
+ private final int datasetID;
+ // Number of active operations on a ILSMIndex instance.
+ private AtomicInteger numActiveOperations;
+
+ public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+ super(ioOpCallbackFactory);
+ this.datasetLifecycleManager = datasetLifecycleManager;
+ this.numActiveOperations = new AtomicInteger(0);
+ this.datasetID = datasetID;
+ datasetBufferCache = datasetLifecycleManager.getVirtualBufferCache(datasetID);
+ }
+
+ @Override
+ public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ numActiveOperations.incrementAndGet();
+
+ // Increment transactor-local active operations count.
+ AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
+ if (opCallback != null) {
+ opCallback.incrementLocalNumActiveOperations();
+ }
+ }
+
+ @Override
+ public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ // Searches are immediately considered complete, because they should not prevent the execution of flushes.
+ if ((searchCallback != null && searchCallback != NoOpOperationCallback.INSTANCE)
+ || opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ completeOperation(index, opType, searchCallback, modificationCallback);
+ }
+ }
+
+ @Override
+ public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ int nActiveOps = numActiveOperations.decrementAndGet();
+
+ // Decrement transactor-local active operations count.
+ AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
+ if (opCallback != null) {
+ opCallback.decrementLocalNumActiveOperations();
+ }
+ // If we need a flush, and this is the last completing operation, then schedule the flush.
+ if (datasetBufferCache.isFull() && nActiveOps == 0 && opType != LSMOperationType.FLUSH) {
+ Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
+ for (ILSMIndex lsmIndex : indexes) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(ioOpCallback);
+ }
+
+ }
+ }
+
+ private AbstractOperationCallback getOperationCallback(ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) {
+
+ if (modificationCallback == NoOpOperationCallback.INSTANCE || modificationCallback == null) {
+ return null;
+ }
+ if (searchCallback != null && searchCallback != NoOpOperationCallback.INSTANCE) {
+ return (AbstractOperationCallback) searchCallback;
+ } else {
+ return (AbstractOperationCallback) modificationCallback;
+ }
+ }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
new file mode 100644
index 0000000..93a86e4
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -0,0 +1,71 @@
+package edu.uci.ics.asterix.common.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+
+public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
+
+ private final boolean isPrimary;
+
+ public AsterixLSMInsertDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op,
+ boolean isPrimary) {
+ super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, op);
+ this.isPrimary = isPrimary;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ boolean first = true;
+ accessor.reset(buffer);
+ ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
+ int tupleCount = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < tupleCount; i++) {
+ if (tupleFilter != null) {
+ frameTuple.reset(accessor, i);
+ if (!tupleFilter.accept(frameTuple)) {
+ lsmAccessor.noOp();
+ continue;
+ }
+ }
+ tuple.reset(accessor, i);
+ switch (op) {
+ case INSERT:
+ if (first && isPrimary) {
+ lsmAccessor.insert(tuple);
+ first = false;
+ } else {
+ lsmAccessor.forceInsert(tuple);
+ }
+ break;
+ case DELETE:
+ if (first && isPrimary) {
+ lsmAccessor.delete(tuple);
+ first = false;
+ } else {
+ lsmAccessor.forceDelete(tuple);
+ }
+ break;
+ default: {
+ throw new HyracksDataException("Unsupported operation " + op
+ + " in tree index InsertDelete operator");
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ System.arraycopy(buffer.array(), 0, writeBuffer.array(), 0, buffer.capacity());
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
new file mode 100644
index 0000000..15ad465
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.asterix.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexInsertUpdateDeleteOperator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor extends LSMInvertedIndexInsertUpdateDeleteOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ public AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ RecordDescriptor recDesc, IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
+ IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
+ IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
+ IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
+ int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
+ ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory) {
+ super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+ tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
+ fieldPermutation, op, dataflowHelperFactory, tupleFilterFactory, modificationOpCallbackFactory);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AsterixLSMInsertDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
+ recordDescProvider, op, false);
+ }
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
new file mode 100644
index 0000000..b1bd6ce
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
@@ -0,0 +1,45 @@
+package edu.uci.ics.asterix.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class AsterixLSMTreeInsertDeleteOperatorDescriptor extends LSMTreeIndexInsertUpdateDeleteOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean isPrimary;
+
+ public AsterixLSMTreeInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
+ IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
+ ITupleFilterFactory tupleFilterFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackProvider, boolean isPrimary) {
+ super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, fieldPermutation, op, dataflowHelperFactory,
+ tupleFilterFactory, modificationOpCallbackProvider);
+ this.isPrimary = isPrimary;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AsterixLSMInsertDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
+ recordDescProvider, op, isPrimary);
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
similarity index 91%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 74f39ad..c9c01e8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -13,11 +13,11 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
import java.util.List;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
@@ -29,9 +29,9 @@
public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
- protected final IndexOperationTracker opTracker;
+ protected final BaseOperationTracker opTracker;
- public AbstractLSMIOOperationCallback(IndexOperationTracker opTracker) {
+ public AbstractLSMIOOperationCallback(BaseOperationTracker opTracker) {
this.opTracker = opTracker;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
similarity index 89%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 382a6d0..1fe77a0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -13,11 +13,11 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
import java.util.List;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeImmutableComponent;
@@ -25,7 +25,7 @@
public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMBTreeIOOperationCallback(IndexOperationTracker opTracker) {
+ public LSMBTreeIOOperationCallback(BaseOperationTracker opTracker) {
super(opTracker);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
similarity index 83%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index a51da07..7524ddb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -13,23 +13,23 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
public class LSMBTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
private static final long serialVersionUID = 1L;
-
+
public static LSMBTreeIOOperationCallbackFactory INSTANCE = new LSMBTreeIOOperationCallbackFactory();
-
+
private LSMBTreeIOOperationCallbackFactory() {
}
@Override
public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
- return new LSMBTreeIOOperationCallback((IndexOperationTracker) syncObj);
+ return new LSMBTreeIOOperationCallback((BaseOperationTracker) syncObj);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
similarity index 89%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 0782c67..fb30742 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -13,18 +13,18 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
import java.util.List;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexImmutableComponent;
public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMInvertedIndexIOOperationCallback(IndexOperationTracker opTracker) {
+ public LSMInvertedIndexIOOperationCallback(BaseOperationTracker opTracker) {
super(opTracker);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
similarity index 82%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index 790c60c..5e5b0ed 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -13,9 +13,9 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
@@ -24,12 +24,12 @@
private static final long serialVersionUID = 1L;
public static LSMInvertedIndexIOOperationCallbackFactory INSTANCE = new LSMInvertedIndexIOOperationCallbackFactory();
-
+
private LSMInvertedIndexIOOperationCallbackFactory() {
}
-
+
@Override
public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
- return new LSMInvertedIndexIOOperationCallback((IndexOperationTracker) syncObj);
+ return new LSMInvertedIndexIOOperationCallback((BaseOperationTracker) syncObj);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
similarity index 89%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index b2a59b4..324cccb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -13,18 +13,18 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
import java.util.List;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeImmutableComponent;
public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMRTreeIOOperationCallback(IndexOperationTracker opTracker) {
+ public LSMRTreeIOOperationCallback(BaseOperationTracker opTracker) {
super(opTracker);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
similarity index 83%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 4b47a95..748d93a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -13,23 +13,23 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
public class LSMRTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
private static final long serialVersionUID = 1L;
-
+
public static LSMRTreeIOOperationCallbackFactory INSTANCE = new LSMRTreeIOOperationCallbackFactory();
-
+
private LSMRTreeIOOperationCallbackFactory() {
}
@Override
public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
- return new LSMRTreeIOOperationCallback((IndexOperationTracker) syncObj);
+ return new LSMRTreeIOOperationCallback((BaseOperationTracker) syncObj);
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
index dc7b543..5e53060 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.common.transactions;
+import java.util.concurrent.atomic.AtomicInteger;
+
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
@@ -26,7 +28,7 @@
protected final int[] primaryKeyFields;
protected final ITransactionContext txnCtx;
protected final ILockManager lockManager;
- protected int transactorLocalNumActiveOperations = 0;
+ protected final AtomicInteger transactorLocalNumActiveOperations;
protected final long[] longHashes;
public AbstractOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
@@ -35,6 +37,7 @@
this.primaryKeyFields = primaryKeyFields;
this.txnCtx = txnCtx;
this.lockManager = lockManager;
+ this.transactorLocalNumActiveOperations = new AtomicInteger(0);
this.longHashes = new long[2];
}
@@ -44,14 +47,14 @@
}
public int getLocalNumActiveOperations() {
- return transactorLocalNumActiveOperations;
+ return transactorLocalNumActiveOperations.get();
}
public void incrementLocalNumActiveOperations() {
- transactorLocalNumActiveOperations++;
+ transactorLocalNumActiveOperations.incrementAndGet();
}
public void decrementLocalNumActiveOperations() {
- transactorLocalNumActiveOperations--;
+ transactorLocalNumActiveOperations.decrementAndGet();
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index f762b16..b53698b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -5,7 +5,8 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
@@ -25,11 +26,7 @@
public ILSMMergePolicy getLSMMergePolicy();
- public ILSMOperationTrackerFactory getLSMBTreeOperationTrackerFactory();
-
- public ILSMOperationTrackerFactory getLSMRTreeOperationTrackerFactory();
-
- public ILSMOperationTrackerFactory getLSMInvertedIndexOperationTrackerFactory();
+ public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider();
@@ -46,4 +43,6 @@
public ResourceIdFactory getResourceIdFactory();
public IIOManager getIOManager();
+
+ public IVirtualBufferCache getVirtualBufferCache(int datasetID);
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
index 6e42e2d..79fbf3b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
@@ -64,8 +64,10 @@
* @param txnContext
* @throws ACIDException
* TODO
+ * @return true if the lock count is 0, false otherwise.
*/
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException;
+ public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
+ throws ACIDException;
/**
* @param datasetId
@@ -73,8 +75,9 @@
* @param txnContext
* @throws ACIDException
* TODO
+ * @return true if the lock count is 0, false otherwise.
*/
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
+ public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
throws ACIDException;
/**
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILoggerRepository.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILoggerRepository.java
index 036d66c..323b9ce 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILoggerRepository.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILoggerRepository.java
@@ -1,6 +1,8 @@
package edu.uci.ics.asterix.common.transactions;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+
public interface ILoggerRepository {
- public ILogger getIndexLogger(long resourceId, byte resourceType);
+ public ILogger getIndexLogger(long resourceId, byte resourceType) throws ACIDException;
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index a06cc75..1ffd5f1 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -1,7 +1,6 @@
package edu.uci.ics.asterix.common.transactions;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext.TransactionType;
import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -14,6 +13,8 @@
public void decreaseActiveTransactionCountOnIndexes() throws HyracksDataException;
+ public int getActiveOperationCountOnIndexes() throws HyracksDataException;
+
public LogicalLogLocator getFirstLogLocator();
public LogicalLogLocator getLastLogLocator();
diff --git a/asterix-installer/src/main/resources/conf/asterix-configuration.xml b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
index bfec2db..6c2b757 100644
--- a/asterix-installer/src/main/resources/conf/asterix-configuration.xml
+++ b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
@@ -47,9 +47,9 @@
<property>
<name>storage.memorycomponent.numpages</name>
- <value>4096</value>
+ <value>1024</value>
<description>The number of pages to allocate for a memory component.
- (Default = 4096)
+ (Default = 1024)
</description>
</property>
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 5aca7c4..6485c16 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -25,6 +25,7 @@
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
@@ -273,6 +274,7 @@
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
txnCtx.setTransactionType(TransactionType.READ_WRITE);
+ txnCtx.registerIndexAndCallback(lsmIndex, (AbstractOperationCallback) modCallback);
// TODO: fix exceptions once new BTree exception model is in hyracks.
indexAccessor.insert(tuple);
@@ -575,6 +577,7 @@
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
txnCtx.setTransactionType(TransactionType.READ_WRITE);
+ txnCtx.registerIndexAndCallback(lsmIndex, (AbstractOperationCallback) modCallback);
indexAccessor.delete(tuple);
indexLifecycleManager.close(resourceID);
@@ -974,8 +977,8 @@
public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException {
int mostRecentDatasetId = MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID;
long resourceID = MetadataPrimaryIndexes.DATASET_DATASET.getResourceID();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
try {
+ IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
indexLifecycleManager.open(resourceID);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -991,6 +994,8 @@
try {
while (rangeCursor.hasNext()) {
rangeCursor.next();
+ ITupleReference ref = rangeCursor.getTuple();
+ Dataset ds = valueExtractor.getValue(jobId, rangeCursor.getTuple());
datasetId = ((Dataset) valueExtractor.getValue(jobId, rangeCursor.getTuple())).getDatasetId();
if (mostRecentDatasetId < datasetId) {
mostRecentDatasetId = datasetId;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 422fdfd..b4f4ee5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -27,12 +27,14 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
-import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.TransactionalResourceManagerRepository;
import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
@@ -57,7 +59,6 @@
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
-import edu.uci.ics.asterix.transaction.management.resource.ILocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
@@ -69,24 +70,18 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.util.IndexFileNameUtil;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactory;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
import edu.uci.ics.hyracks.storage.common.file.LocalResource;
-import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
/**
* Initializes the remote metadata storage facilities ("universe") using a
@@ -99,8 +94,6 @@
*/
public class MetadataBootstrap {
private static final Logger LOGGER = Logger.getLogger(MetadataBootstrap.class.getName());
- private static final int DEFAULT_MEM_PAGE_SIZE = 32768;
- private static final int DEFAULT_MEM_NUM_PAGES = 100;
private static IAsterixAppRuntimeContext runtimeContext;
@@ -215,6 +208,7 @@
//change the exception type to AbortFailureException
throw new MetadataException(e);
}
+ throw e;
}
}
@@ -338,46 +332,43 @@
metadataStore + File.separator + index.getFileNameRelativePath(),
runtimeContext.getMetaDataIODeviceId());
FileReference file = new FileReference(new File(filePath));
- IInMemoryBufferCache memBufferCache = new InMemoryBufferCache(new HeapBufferAllocator(), DEFAULT_MEM_PAGE_SIZE,
- DEFAULT_MEM_NUM_PAGES, new TransientFileMapManager());
+ IVirtualBufferCache virtualBufferCache = runtimeContext.getVirtualBufferCache(index.getDatasetId().getId());
ITypeTraits[] typeTraits = index.getTypeTraits();
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
- ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
- IInMemoryFreePageManager memFreePageManager = new InMemoryFreePageManager(DEFAULT_MEM_NUM_PAGES,
- metaDataFrameFactory);
LSMBTree lsmBtree = null;
long resourceID = -1;
+ AsterixRuntimeComponentsProvider rtcProvider = index.isPrimaryIndex() ? AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER
+ : AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER;
+ ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
+ .getDatasetId().getId()) : new BaseOperationTracker(LSMBTreeIOOperationCallbackFactory.INSTANCE);
if (create) {
- lsmBtree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, file, bufferCache,
- fileMapProvider, typeTraits, comparatorFactories, bloomFilterKeyFields,
- runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
- runtimeContext.getLSMBTreeOperationTrackerFactory(), runtimeContext.getLSMIOScheduler(),
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, runtimeContext.getMetaDataIODeviceId());
+ lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCache, ioManager, file, bufferCache, fileMapProvider,
+ typeTraits, comparatorFactories, bloomFilterKeyFields,
+ runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(), opTracker,
+ runtimeContext.getLSMIOScheduler(), rtcProvider, runtimeContext.getMetaDataIODeviceId());
lsmBtree.create();
resourceID = runtimeContext.getResourceIdFactory().createId();
- indexLifecycleManager.register(resourceID, lsmBtree);
-
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
comparatorFactories, bloomFilterKeyFields, index.isPrimaryIndex(),
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- runtimeContext.getMetaDataIODeviceId());
+ runtimeContext.getMetaDataIODeviceId(), index.getDatasetId().getId());
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMBTreeResource);
ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
localResourceRepository.insert(
localResourceFactory.createLocalResource(resourceID, file.getFile().getPath(), 0),
runtimeContext.getMetaDataIODeviceId());
+ indexLifecycleManager.register(resourceID, lsmBtree);
} else {
resourceID = localResourceRepository.getResourceByName(file.getFile().getPath()).getResourceId();
lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
if (lsmBtree == null) {
- lsmBtree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, file,
- bufferCache, fileMapProvider, typeTraits, comparatorFactories, bloomFilterKeyFields,
+ lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCache, ioManager, file, bufferCache,
+ fileMapProvider, typeTraits, comparatorFactories, bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
- runtimeContext.getLSMBTreeOperationTrackerFactory(), runtimeContext.getLSMIOScheduler(),
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, runtimeContext.getMetaDataIODeviceId());
+ opTracker, runtimeContext.getLSMIOScheduler(),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ runtimeContext.getMetaDataIODeviceId());
indexLifecycleManager.register(resourceID, lsmBtree);
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 117f492..998eac4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -27,8 +27,11 @@
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider;
import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider;
+import edu.uci.ics.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
+import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
@@ -74,6 +77,7 @@
import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
@@ -129,9 +133,7 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexInsertUpdateDeleteOperator;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
@@ -542,17 +544,15 @@
primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
}
}
+ AsterixRuntimeComponentsProvider rtcProvider = isSecondary ? AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER
+ : AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER;
BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- storageProperties.getMemoryComponentPageSize(),
- storageProperties.getMemoryComponentNumPages(),
- storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider, rtcProvider,
+ rtcProvider, rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()),
+ retainInput, searchCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
@@ -616,13 +616,12 @@
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
typeTraits, comparatorFactories, keyFields, new LSMRTreeDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
nestedKeyType.getTypeTag(), comparatorFactories.length),
- storageProperties.getMemoryComponentPageSize(),
- storageProperties.getMemoryComponentNumPages(),
storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
@@ -780,12 +779,11 @@
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, false, numElementsHint, new LSMBTreeDataflowHelperFactory(
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- storageProperties.getMemoryComponentPageSize(),
- storageProperties.getMemoryComponentNumPages(),
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
@@ -845,18 +843,18 @@
PrimaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new PrimaryIndexModificationOperationCallbackFactory(
jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
- LSMTreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(
+ AsterixLSMTreeInsertDeleteOperatorDescriptor insertDeleteOp = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
- new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, storageProperties
- .getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- storageProperties.getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory);
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
+ .getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
splitsAndConstraint.second);
} catch (MetadataException me) {
@@ -1041,17 +1039,17 @@
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
- LSMTreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(
+ AsterixLSMTreeInsertDeleteOperatorDescriptor btreeBulkLoad = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
- new LSMBTreeDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, storageProperties
- .getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- storageProperties.getBloomFilterFalsePositiveRate()), filterFactory,
- modificationCallbackFactory);
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, storageProperties
+ .getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory,
+ false);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
} catch (MetadataException e) {
@@ -1168,18 +1166,16 @@
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_INVERTED_INDEX);
- LSMInvertedIndexInsertUpdateDeleteOperator insertDeleteOp = new LSMInvertedIndexInsertUpdateDeleteOperator(
+ AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor insertDeleteOp = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
- new LSMInvertedIndexDataflowHelperFactory(
+ new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER, storageProperties
- .getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages(),
- storageProperties.getBloomFilterFalsePositiveRate()), filterFactory,
- modificationCallbackFactory);
+ .getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
splitsAndConstraint.second);
} catch (MetadataException e) {
@@ -1263,20 +1259,19 @@
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
- LSMTreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(
+ AsterixLSMTreeInsertDeleteOperatorDescriptor rtreeUpdate = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, null, fieldPermutation, indexOp, new LSMRTreeDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
nestedKeyType.getTypeTag(), comparatorFactories.length),
- storageProperties.getMemoryComponentPageSize(),
- storageProperties.getMemoryComponentNumPages(),
storageProperties.getBloomFilterFalsePositiveRate()), filterFactory,
- modificationCallbackFactory);
+ modificationCallbackFactory, false);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
} catch (MetadataException | IOException e) {
throw new AlgebricksException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
deleted file mode 100644
index 9da0a8f..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.asterix.transaction.management.opcallbacks;
-
-import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
-
-public class IndexOperationTracker implements ILSMOperationTracker {
-
- // Number of active operations on a ILSMIndex instance.
- private AtomicInteger numActiveOperations;
- private long lastLSN;
- private long firstLSN;
- private final ILSMIndex index;
- private final ILSMIOOperationCallback ioOpCallback;
- private ILSMIndexAccessor accessor;
-
- public IndexOperationTracker(ILSMIndex index, ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
- this.numActiveOperations = new AtomicInteger(0);
- this.index = index;
- //TODO
- //This code is added to avoid NullPointException when the index's comparatorFactory is null.
- //The null comparator factory is set in the constructor of the IndexDropOperatorDescriptor.
- if (ioOpCallbackFactory != null) {
- ioOpCallback = ioOpCallbackFactory.createIOOperationCallback(this);
- } else {
- ioOpCallback = NoOpIOOperationCallback.INSTANCE;
- }
- resetLSNs();
- }
-
- @Override
- public void beforeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) throws HyracksDataException {
- if (opType != LSMOperationType.FORCE_MODIFICATION) {
- numActiveOperations.incrementAndGet();
-
- // Increment transactor-local active operations count.
- AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
- if (opCallback != null) {
- opCallback.incrementLocalNumActiveOperations();
- }
- }
- }
-
- @Override
- public void afterOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) throws HyracksDataException {
- // Searches are immediately considered complete, because they should not prevent the execution of flushes.
- if (searchCallback != null) {
- completeOperation(opType, searchCallback, modificationCallback);
- }
- }
-
- @Override
- public void completeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) throws HyracksDataException {
-
- // Decrement transactor-local active operations count.
- AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
- if (opCallback != null) {
- opCallback.decrementLocalNumActiveOperations();
- }
- // If we need a flush, and this is the last completing operation, then schedule the flush.
- // Once the flush has completed notify all waiting operations.
- if (index.getFlushStatus() && numActiveOperations.decrementAndGet() == 0 && opType != LSMOperationType.FLUSH) {
- if (accessor == null) {
- accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- }
- accessor.scheduleFlush(ioOpCallback);
- }
- }
-
- private AbstractOperationCallback getOperationCallback(ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) {
-
- if (searchCallback == NoOpOperationCallback.INSTANCE || modificationCallback == NoOpOperationCallback.INSTANCE) {
- return null;
- }
- if (searchCallback != null) {
- return (AbstractOperationCallback) searchCallback;
- } else {
- return (AbstractOperationCallback) modificationCallback;
- }
- }
-
- public ILSMIOOperationCallback getIOOperationCallback() {
- return ioOpCallback;
- }
-
- public long getLastLSN() {
- return lastLSN;
- }
-
- public long getFirstLSN() {
- return firstLSN;
- }
-
- public void updateLastLSN(long lastLSN) {
- if (firstLSN == -1) {
- firstLSN = lastLSN;
- }
- this.lastLSN = Math.max(this.lastLSN, lastLSN);
- }
-
- public void resetLSNs() {
- lastLSN = -1;
- firstLSN = -1;
- }
-}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTrackerFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTrackerFactory.java
deleted file mode 100644
index 032a4f9..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTrackerFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.asterix.transaction.management.opcallbacks;
-
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
-
-public class IndexOperationTrackerFactory implements ILSMOperationTrackerFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
-
- public IndexOperationTrackerFactory(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
- this.ioOpCallbackFactory = ioOpCallbackFactory;
- }
-
- @Override
- public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
- return new IndexOperationTracker(index, ioOpCallbackFactory);
- }
-
-}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 15ccde4..9097570 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -41,9 +41,9 @@
protected final IndexOperation indexOp;
protected final ITransactionSubsystem txnSubsystem;
- public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
- ITransactionContext txnCtx, ILockManager lockManager,
- ITransactionSubsystem txnSubsystem, long resourceId, byte resourceType, IndexOperation indexOp) {
+ public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
+ ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, byte resourceType,
+ IndexOperation indexOp) {
super(datasetId, primaryKeyFields, txnCtx, lockManager);
this.resourceId = resourceId;
this.resourceType = resourceType;
@@ -63,19 +63,19 @@
@Override
public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
- ILogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, resourceType);
- int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
- LSMBTreeTupleReference lsmBTreeTuple = (LSMBTreeTupleReference) before;
- IndexOperation oldOp = IndexOperation.INSERT;
- if (before == null) {
- oldOp = IndexOperation.NOOP;
- }
- if (lsmBTreeTuple != null && lsmBTreeTuple.isAntimatter()) {
- oldOp = IndexOperation.DELETE;
- }
try {
- ((IndexLogger)logger).generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId, indexOp, after,
- oldOp, before);
+ ILogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, resourceType);
+ int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
+ LSMBTreeTupleReference lsmBTreeTuple = (LSMBTreeTupleReference) before;
+ IndexOperation oldOp = IndexOperation.INSERT;
+ if (before == null) {
+ oldOp = IndexOperation.NOOP;
+ }
+ if (lsmBTreeTuple != null && lsmBTreeTuple.isAntimatter()) {
+ oldOp = IndexOperation.DELETE;
+ }
+ ((IndexLogger) logger).generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId,
+ indexOp, after, oldOp, before);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java
new file mode 100644
index 0000000..7b7198e
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
+
+public class PrimaryIndexOperationTrackerProvider implements ILSMOperationTrackerProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int datasetID;
+
+ public PrimaryIndexOperationTrackerProvider(int datasetID) {
+ this.datasetID = datasetID;
+ }
+
+ @Override
+ public ILSMOperationTracker createOperationTracker(IHyracksTaskContext ctx) {
+ DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
+ .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
+ return dslcManager.getOperationTracker(datasetID);
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index 0e66b32..2287ed7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -61,9 +61,10 @@
@Override
public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
- ILogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, resourceType);
- int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
+
try {
+ ILogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, resourceType);
+ int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
IndexOperation effectiveOldOp;
if (resourceType == ResourceType.LSM_BTREE) {
LSMBTreeTupleReference lsmBTreeTuple = (LSMBTreeTupleReference) before;
@@ -77,8 +78,8 @@
} else {
effectiveOldOp = oldOp;
}
- ((IndexLogger)logger).generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId, indexOp, after,
- effectiveOldOp, before);
+ ((IndexLogger) logger).generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId,
+ indexOp, after, effectiveOldOp, before);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
new file mode 100644
index 0000000..4703875
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
+
+public class SecondaryIndexOperationTrackerFactory implements ILSMOperationTrackerProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
+
+ public SecondaryIndexOperationTrackerFactory(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+ this.ioOpCallbackFactory = ioOpCallbackFactory;
+ }
+
+ @Override
+ public ILSMOperationTracker createOperationTracker(IHyracksTaskContext ctx) {
+ return new BaseOperationTracker(ioOpCallbackFactory);
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/AbstractLSMLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/AbstractLSMLocalResourceMetadata.java
new file mode 100644
index 0000000..a95b677
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/AbstractLSMLocalResourceMetadata.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.asterix.transaction.management.resource;
+
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+
+public abstract class AbstractLSMLocalResourceMetadata implements ILocalResourceMetadata {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final int datasetID;
+
+ public AbstractLSMLocalResourceMetadata(int datasetID) {
+ this.datasetID = datasetID;
+ }
+
+ public int getDatasetID() {
+ return datasetID;
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index c3f9747..ac4ca61 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -2,52 +2,46 @@
import java.io.File;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
-import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-public class LSMBTreeLocalResourceMetadata implements ILocalResourceMetadata {
+public class LSMBTreeLocalResourceMetadata extends AbstractLSMLocalResourceMetadata {
private static final long serialVersionUID = 1L;
private final ITypeTraits[] typeTraits;
private final IBinaryComparatorFactory[] cmpFactories;
private final int[] bloomFilterKeyFields;
- private final int memPageSize;
- private final int memNumPages;
+ private final boolean isPrimary;
private FileSplit[] fileSplits;
private int ioDeviceID;
public LSMBTreeLocalResourceMetadata(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
- int[] bloomFilterKeyFields, boolean isPrimary, int memPageSize, int memNumPages, FileSplit[] fileSplits) {
+ int[] bloomFilterKeyFields, boolean isPrimary, FileSplit[] fileSplits, int datasetID) {
+ super(datasetID);
this.typeTraits = typeTraits;
this.cmpFactories = cmpFactories;
this.bloomFilterKeyFields = bloomFilterKeyFields;
- this.memPageSize = memPageSize;
- this.memNumPages = memNumPages;
+ this.isPrimary = isPrimary;
this.fileSplits = fileSplits;
}
public LSMBTreeLocalResourceMetadata(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
- int[] bloomFilterKeyFields, boolean isPrimary, int memPageSize, int memNumPages, int ioDeviceID) {
+ int[] bloomFilterKeyFields, boolean isPrimary, int ioDeviceID, int datasetID) {
+ super(datasetID);
this.typeTraits = typeTraits;
this.cmpFactories = cmpFactories;
this.bloomFilterKeyFields = bloomFilterKeyFields;
- this.memPageSize = memPageSize;
- this.memNumPages = memNumPages;
+ this.isPrimary = isPrimary;
this.ioDeviceID = ioDeviceID;
}
@@ -55,15 +49,13 @@
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) {
FileReference file = new FileReference(new File(filePath));
- IInMemoryBufferCache memBufferCache = new InMemoryBufferCache(new HeapBufferAllocator(), memPageSize,
- memNumPages, new TransientFileMapManager());
- ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
- IInMemoryFreePageManager memFreePageManager = new InMemoryFreePageManager(memNumPages, metaDataFrameFactory);
- LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, runtimeContextProvider
- .getIOManager(), file, runtimeContextProvider.getBufferCache(), runtimeContextProvider
- .getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields, runtimeContextProvider
- .getBloomFilterFalsePositiveRate(), runtimeContextProvider.getLSMMergePolicy(), runtimeContextProvider
- .getLSMBTreeOperationTrackerFactory(), runtimeContextProvider.getLSMIOScheduler(),
+ IVirtualBufferCache virtualBufferCache = runtimeContextProvider.getVirtualBufferCache(datasetID);
+ LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCache, runtimeContextProvider.getIOManager(),
+ file, runtimeContextProvider.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits,
+ cmpFactories, bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ runtimeContextProvider.getLSMMergePolicy(),
+ isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
+ LSMBTreeIOOperationCallbackFactory.INSTANCE), runtimeContextProvider.getLSMIOScheduler(),
runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(), fileSplits == null ? ioDeviceID
: fileSplits[partition].getIODeviceId());
return lsmBTree;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index 89a32c5..5d9ac75 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -1,23 +1,19 @@
package edu.uci.ics.asterix.transaction.management.resource;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryBufferCache;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.InvertedIndexUtils;
-import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
-public class LSMInvertedIndexLocalResourceMetadata implements ILocalResourceMetadata {
+public class LSMInvertedIndexLocalResourceMetadata extends AbstractLSMLocalResourceMetadata {
private static final long serialVersionUID = 1L;
@@ -26,22 +22,19 @@
private final ITypeTraits[] tokenTypeTraits;
private final IBinaryComparatorFactory[] tokenCmpFactories;
private final IBinaryTokenizerFactory tokenizerFactory;
- private final int memPageSize;
- private final int memNumPages;
private final boolean isPartitioned;
private final FileSplit[] fileSplits;
public LSMInvertedIndexLocalResourceMetadata(ITypeTraits[] invListTypeTraits,
IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
- IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory, int memPageSize,
- int memNumPages, boolean isPartitioned, FileSplit[] fileSplits) {
+ IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory,
+ boolean isPartitioned, FileSplit[] fileSplits, int datasetID) {
+ super(datasetID);
this.invListTypeTraits = invListTypeTraits;
this.invListCmpFactories = invListCmpFactories;
this.tokenTypeTraits = tokenTypeTraits;
this.tokenCmpFactories = tokenCmpFactories;
this.tokenizerFactory = tokenizerFactory;
- this.memPageSize = memPageSize;
- this.memNumPages = memNumPages;
this.isPartitioned = isPartitioned;
this.fileSplits = fileSplits;
}
@@ -49,35 +42,28 @@
@Override
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) throws HyracksDataException {
-
- ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
- IInMemoryBufferCache memBufferCache = new DualIndexInMemoryBufferCache(new HeapBufferAllocator(), memPageSize,
- memNumPages);
- IInMemoryFreePageManager memFreePageManager = new DualIndexInMemoryFreePageManager(memNumPages,
- metaDataFrameFactory);
+ IVirtualBufferCache virtualBufferCache = runtimeContextProvider.getVirtualBufferCache(datasetID);
try {
if (isPartitioned) {
- return InvertedIndexUtils.createPartitionedLSMInvertedIndex(memBufferCache, memFreePageManager,
- runtimeContextProvider.getFileMapManager(), invListTypeTraits, invListCmpFactories,
- tokenTypeTraits, tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getIOManager(), filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(),
- runtimeContextProvider.getLSMMergePolicy(),
- runtimeContextProvider.getLSMInvertedIndexOperationTrackerFactory(),
- runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(),
- fileSplits[partition].getIODeviceId());
+ return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCache, runtimeContextProvider
+ .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+ tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getIOManager(), filePath, runtimeContextProvider
+ .getBloomFilterFalsePositiveRate(), runtimeContextProvider.getLSMMergePolicy(),
+ new BaseOperationTracker(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE),
+ runtimeContextProvider.getLSMIOScheduler(), runtimeContextProvider
+ .getLSMInvertedIndexIOOperationCallbackProvider(), fileSplits[partition]
+ .getIODeviceId());
} else {
- return InvertedIndexUtils.createLSMInvertedIndex(memBufferCache, memFreePageManager,
- runtimeContextProvider.getFileMapManager(), invListTypeTraits, invListCmpFactories,
- tokenTypeTraits, tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getIOManager(), filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(),
- runtimeContextProvider.getLSMMergePolicy(),
- runtimeContextProvider.getLSMInvertedIndexOperationTrackerFactory(),
- runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(),
- fileSplits[partition].getIODeviceId());
+ return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCache, runtimeContextProvider
+ .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+ tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getIOManager(), filePath, runtimeContextProvider
+ .getBloomFilterFalsePositiveRate(), runtimeContextProvider.getLSMMergePolicy(),
+ new BaseOperationTracker(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE),
+ runtimeContextProvider.getLSMIOScheduler(), runtimeContextProvider
+ .getLSMInvertedIndexIOOperationCallbackProvider(), fileSplits[partition]
+ .getIODeviceId());
}
} catch (IndexException e) {
throw new HyracksDataException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index 2506697..8ad9225 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -2,6 +2,8 @@
import java.io.File;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -9,20 +11,14 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
-import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryBufferCache;
-import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.utils.LSMRTreeUtils;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
-import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
-public class LSMRTreeLocalResourceMetadata implements ILocalResourceMetadata {
+public class LSMRTreeLocalResourceMetadata extends AbstractLSMLocalResourceMetadata {
private static final long serialVersionUID = 1L;
@@ -32,22 +28,19 @@
private final IPrimitiveValueProviderFactory[] valueProviderFactories;
private final RTreePolicyType rtreePolicyType;
private final ILinearizeComparatorFactory linearizeCmpFactory;
- private final int memPageSize;
- private final int memNumPages;
private final FileSplit[] fileSplits;
public LSMRTreeLocalResourceMetadata(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] rtreeCmpFactories,
IBinaryComparatorFactory[] btreeCmpFactories, IPrimitiveValueProviderFactory[] valueProviderFactories,
- RTreePolicyType rtreePolicyType, ILinearizeComparatorFactory linearizeCmpFactory, int memPageSize,
- int memNumPages, FileSplit[] fileSplits) {
+ RTreePolicyType rtreePolicyType, ILinearizeComparatorFactory linearizeCmpFactory, FileSplit[] fileSplits,
+ int datasetID) {
+ super(datasetID);
this.typeTraits = typeTraits;
this.rtreeCmpFactories = rtreeCmpFactories;
this.btreeCmpFactories = btreeCmpFactories;
this.valueProviderFactories = valueProviderFactories;
this.rtreePolicyType = rtreePolicyType;
this.linearizeCmpFactory = linearizeCmpFactory;
- this.memPageSize = memPageSize;
- this.memNumPages = memNumPages;
this.fileSplits = fileSplits;
}
@@ -55,22 +48,16 @@
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) throws HyracksDataException {
FileReference file = new FileReference(new File(filePath));
- ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
- IInMemoryBufferCache memBufferCache = new DualIndexInMemoryBufferCache(new HeapBufferAllocator(), memPageSize,
- memNumPages);
- IInMemoryFreePageManager memFreePageManager = new DualIndexInMemoryFreePageManager(memNumPages,
- metaDataFrameFactory);
-
+ IVirtualBufferCache virtualBufferCache = runtimeContextProvider.getVirtualBufferCache(datasetID);
try {
- return LSMRTreeUtils.createLSMTree(memBufferCache, memFreePageManager,
- runtimeContextProvider.getIOManager(), file, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
- runtimeContextProvider.getLSMMergePolicy(),
- runtimeContextProvider.getLSMRTreeOperationTrackerFactory(),
- runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(), linearizeCmpFactory,
- fileSplits[partition].getIODeviceId());
+ return LSMRTreeUtils.createLSMTree(virtualBufferCache, runtimeContextProvider.getIOManager(), file,
+ runtimeContextProvider.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits,
+ rtreeCmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType,
+ runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
+ .getLSMMergePolicy(),
+ new BaseOperationTracker(LSMRTreeIOOperationCallbackFactory.INSTANCE), runtimeContextProvider
+ .getLSMIOScheduler(), runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(),
+ linearizeCmpFactory, fileSplits[partition].getIODeviceId());
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
index ed0f79f..707e02e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
@@ -1,5 +1,6 @@
package edu.uci.ics.asterix.transaction.management.resource;
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactory;
import edu.uci.ics.hyracks.storage.common.file.LocalResource;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceFactoryProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceFactoryProvider.java
index 4157c32..6afb9a9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceFactoryProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceFactoryProvider.java
@@ -1,5 +1,6 @@
package edu.uci.ics.asterix.transaction.management.resource;
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactory;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 8fb7494..bfdda15 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -110,6 +110,9 @@
@Override
public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
+ if (entityHashValue == 379839425) {
+ System.out.println("break");
+ }
internalLock(datasetId, entityHashValue, lockMode, txnContext, false);
}
@@ -633,14 +636,15 @@
}
@Override
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, false, false);
+ public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
+ throws ACIDException {
+ return internalUnlock(datasetId, entityHashValue, txnContext, false, false);
}
@Override
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
+ public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
+ return internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
}
private void instantUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
@@ -648,7 +652,7 @@
internalUnlock(datasetId, entityHashValue, txnContext, true, false);
}
- private void internalUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext,
+ private boolean internalUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext,
boolean isInstant, boolean commitFlag) throws ACIDException {
JobId jobId = txnContext.getJobId();
int eLockInfo = -1;
@@ -657,6 +661,7 @@
int entityInfo = -1;
byte datasetLockMode;
+ boolean lockCountIsZero = false;
if (IS_DEBUG_MODE) {
if (entityHashValue == -1) {
throw new UnsupportedOperationException(
@@ -704,6 +709,7 @@
if (entityInfoManager.getEntityLockCount(entityInfo) == 0
&& entityInfoManager.getDatasetLockCount(entityInfo) == 0) {
+ lockCountIsZero = true;
int threadCount = 0; //number of threads(in the same job) waiting on the same resource
int waiterObjId = jobInfo.getFirstWaitingResource();
int waitingEntityInfo;
@@ -783,6 +789,7 @@
} finally {
unlatchLockTable();
}
+ return lockCountIsZero;
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java
index c51bb11..bd7d159 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java
@@ -17,10 +17,12 @@
import java.util.HashMap;
import java.util.Map;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.ILogger;
import edu.uci.ics.asterix.common.transactions.ILoggerRepository;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
import edu.uci.ics.asterix.common.transactions.MutableResourceId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
public class IndexLoggerRepository implements ILoggerRepository {
@@ -35,13 +37,18 @@
}
@Override
- public synchronized ILogger getIndexLogger(long resourceId, byte resourceType) {
+ public synchronized ILogger getIndexLogger(long resourceId, byte resourceType) throws ACIDException {
mutableResourceId.setId(resourceId);
ILogger logger = loggers.get(mutableResourceId);
if (logger == null) {
MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
- IIndex index = (IIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
- .getIndex(resourceId);
+ IIndex index;
+ try {
+ index = (IIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
+ .getIndex(resourceId);
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
logger = new IndexLogger(resourceId, resourceType, index);
loggers.put(newMutableResourceId, logger);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
index ce3401c..8506df7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
@@ -19,6 +19,7 @@
import edu.uci.ics.asterix.common.transactions.IResourceManager;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -45,11 +46,13 @@
long resourceId = logRecordHelper.getResourceId(logLocator);
int offset = logRecordHelper.getLogContentBeginPos(logLocator);
- //TODO
- //replace TransactionResourceRepository with IndexLifeCycleManager
- // look up the repository to obtain the resource object
- IIndex index = (IIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
- .getIndex(resourceId);
+ IIndex index;
+ try {
+ index = (IIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
+ .getIndex(resourceId);
+ } catch (HyracksDataException e1) {
+ throw new ACIDException("Cannot undo: unable to find index");
+ }
/* field count */
int fieldCount = logLocator.getBuffer().readInt(offset);
@@ -115,8 +118,13 @@
long resourceId = logRecordHelper.getResourceId(logLocator);
int offset = logRecordHelper.getLogContentBeginPos(logLocator);
- IIndex index = (IIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
- .getIndex(resourceId);
+ IIndex index;
+ try {
+ index = (IIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
+ .getIndex(resourceId);
+ } catch (HyracksDataException e1) {
+ throw new ACIDException("Cannot redo: unable to find index");
+ }
/* field count */
int fieldCount = logLocator.getBuffer().readInt(offset);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 318996a..fc07457 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -27,6 +27,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -45,7 +46,6 @@
import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
import edu.uci.ics.asterix.common.transactions.PhysicalLogLocator;
import edu.uci.ics.asterix.common.transactions.ReusableLogContentObject;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -425,6 +425,14 @@
logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
if (logType != LogType.ENTITY_COMMIT) {
+ if (logType == LogType.COMMIT) {
+ int count = txnCtx.getActiveOperationCountOnIndexes();
+ map = activeTxnCountMaps.get(pageIndex);
+ if (map.containsKey(txnCtx)) {
+ count += (Integer) map.get(txnCtx);
+ }
+ map.put(txnCtx, count);
+ }
// release the ownership as the log record has been placed in
// created space.
logPages[pageIndex].decRefCnt();
@@ -733,6 +741,8 @@
return provider;
}
+ static AtomicInteger t = new AtomicInteger();
+
public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
ITransactionContext ctx = null;
int count = 0;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index a934d6c..b59c48f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -2,9 +2,9 @@
import java.util.List;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -56,7 +56,7 @@
long firstLSN;
if (openIndexList.size() > 0) {
for (IIndex index : openIndexList) {
- firstLSN = ((IndexOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+ firstLSN = ((BaseOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
}
} else {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 50d4625..b262fab 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -34,6 +34,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.IBuffer;
@@ -47,8 +49,6 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
import edu.uci.ics.asterix.common.transactions.PhysicalLogLocator;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
-import edu.uci.ics.asterix.transaction.management.resource.ILocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
@@ -430,7 +430,7 @@
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting sharp checkpoint ... ");
}
-
+
LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
String logDir = logMgr.getLogManagerProperties().getLogDir();
@@ -450,7 +450,7 @@
ILSMIndex lsmIndex = (ILSMIndex) index;
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- IndexOperationTracker indexOpTracker = (IndexOperationTracker) lsmIndex.getOperationTracker();
+ BaseOperationTracker indexOpTracker = (BaseOperationTracker) lsmIndex.getOperationTracker();
BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
indexOpTracker.getIOOperationCallback());
callbackList.add(cb);
@@ -475,7 +475,7 @@
long firstLSN;
if (openIndexList.size() > 0) {
for (IIndex index : openIndexList) {
- firstLSN = ((IndexOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+ firstLSN = ((BaseOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
}
} else {
@@ -522,7 +522,7 @@
if (isSharpCheckpoint) {
logMgr.renewLogFiles();
}
-
+
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Completed sharp checkpoint.");
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 2b3d6cd..cf623a1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -1,11 +1,10 @@
package edu.uci.ics.asterix.transaction.management.service.transaction;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.common.api.IAsterixRuntimeComponentsProvider;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -18,40 +17,46 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
-public class AsterixRuntimeComponentsProvider implements
- IIndexLifecycleManagerProvider, IStorageManagerInterface, ILSMIOOperationSchedulerProvider,
- ILSMMergePolicyProvider, ILSMOperationTrackerFactory, ILSMIOOperationCallbackProvider {
+public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
+ ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider, ILSMOperationTrackerProvider,
+ ILSMIOOperationCallbackProvider {
private static final long serialVersionUID = 1L;
private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
+ private final boolean isSecondary;
- public static final AsterixRuntimeComponentsProvider LSMBTREE_PROVIDER = new AsterixRuntimeComponentsProvider(
- LSMBTreeIOOperationCallbackFactory.INSTANCE);
+ public static final AsterixRuntimeComponentsProvider LSMBTREE_PRIMARY_PROVIDER = new AsterixRuntimeComponentsProvider(
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, false);
+ public static final AsterixRuntimeComponentsProvider LSMBTREE_SECONDARY_PROVIDER = new AsterixRuntimeComponentsProvider(
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, true);
public static final AsterixRuntimeComponentsProvider LSMRTREE_PROVIDER = new AsterixRuntimeComponentsProvider(
- LSMRTreeIOOperationCallbackFactory.INSTANCE);
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, true);
public static final AsterixRuntimeComponentsProvider LSMINVERTEDINDEX_PROVIDER = new AsterixRuntimeComponentsProvider(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE);
- public static final AsterixRuntimeComponentsProvider NOINDEX_PROVIDER = new AsterixRuntimeComponentsProvider(null);
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, true);
+ public static final AsterixRuntimeComponentsProvider NOINDEX_PROVIDER = new AsterixRuntimeComponentsProvider(null,
+ false);
- private AsterixRuntimeComponentsProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+ private AsterixRuntimeComponentsProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean isSecondary) {
this.ioOpCallbackFactory = ioOpCallbackFactory;
+ this.isSecondary = isSecondary;
}
@Override
- public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
- return new IndexOperationTracker(index, ioOpCallbackFactory);
+ public ILSMOperationTracker createOperationTracker(IHyracksTaskContext ctx) {
+ assert isSecondary;
+ return new BaseOperationTracker(ioOpCallbackFactory);
}
@Override
public ILSMIOOperationCallback getIOOperationCallback(ILSMIndex index) {
- return ((IndexOperationTracker) index.getOperationTracker()).getIOOperationCallback();
+ return ((BaseOperationTracker) index.getOperationTracker()).getIOOperationCallback();
}
@Override
@@ -96,5 +101,4 @@
.getResourceIdFactory();
}
-
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 2659d8f..4bdad6b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -15,11 +15,11 @@
package edu.uci.ics.asterix.transaction.management.service.transaction;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
+import java.util.Iterator;
import java.util.Set;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.asterix.common.transactions.ICloseable;
@@ -27,7 +27,6 @@
import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -55,11 +54,11 @@
private JobId jobId;
// List of indexes on which operations were performed on behalf of this transaction.
- private final List<ILSMIndex> indexes = new ArrayList<ILSMIndex>();
+ private final Set<ILSMIndex> indexes = new HashSet<ILSMIndex>();
// List of operation callbacks corresponding to the operand indexes. In particular, needed to track
// the number of active operations contributed by this transaction.
- private final List<AbstractOperationCallback> callbacks = new ArrayList<AbstractOperationCallback>();
+ private final Set<AbstractOperationCallback> callbacks = new HashSet<AbstractOperationCallback>();
public TransactionContext(JobId jobId, TransactionSubsystem transactionSubsystem) throws ACIDException {
this.jobId = jobId;
@@ -85,19 +84,40 @@
public void updateLastLSNForIndexes(long lastLSN) {
synchronized (indexes) {
for (ILSMIndex index : indexes) {
- ((IndexOperationTracker) index.getOperationTracker()).updateLastLSN(lastLSN);
+ ((BaseOperationTracker) index.getOperationTracker()).updateLastLSN(lastLSN);
}
}
}
public void decreaseActiveTransactionCountOnIndexes() throws HyracksDataException {
synchronized (indexes) {
- for (int i = 0; i < indexes.size(); i++) {
- ILSMIndex index = indexes.get(i);
- IModificationOperationCallback modificationCallback = (IModificationOperationCallback) callbacks.get(i);
- ((IndexOperationTracker) index.getOperationTracker()).completeOperation(LSMOperationType.MODIFICATION,
- null, modificationCallback);
+ Set<BaseOperationTracker> opTrackers = new HashSet<BaseOperationTracker>();
+ Iterator<ILSMIndex> indexIt = indexes.iterator();
+ Iterator<AbstractOperationCallback> cbIt = callbacks.iterator();
+ while (indexIt.hasNext()) {
+ ILSMIndex index = indexIt.next();
+ opTrackers.add((BaseOperationTracker) index.getOperationTracker());
+ assert cbIt.hasNext();
}
+ Iterator<BaseOperationTracker> trackerIt = opTrackers.iterator();
+ while (trackerIt.hasNext()) {
+ IModificationOperationCallback modificationCallback = (IModificationOperationCallback) cbIt.next();
+ ((BaseOperationTracker) trackerIt.next()).completeOperation(null, LSMOperationType.MODIFICATION, null,
+ modificationCallback);
+ }
+ }
+ }
+
+ @Override
+ public int getActiveOperationCountOnIndexes() throws HyracksDataException {
+ synchronized (indexes) {
+ int count = 0;
+ Iterator<AbstractOperationCallback> cbIt = callbacks.iterator();
+ while (cbIt.hasNext()) {
+ IModificationOperationCallback modificationCallback = (IModificationOperationCallback) cbIt.next();
+ count += ((AbstractOperationCallback) modificationCallback).getLocalNumActiveOperations();
+ }
+ return count;
}
}
@@ -172,5 +192,4 @@
return (o == this);
}
-
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 4e8808f..c956367 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -26,6 +26,7 @@
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
/**
* An implementation of the @see ITransactionManager interface that provides
@@ -105,15 +106,22 @@
//for entity-level commit
if (PKHashVal != -1) {
- transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
- /*****************************
- try {
- //decrease the transaction reference count on index
- txnContext.decreaseActiveTransactionCountOnIndexes();
- } catch (HyracksDataException e) {
- throw new ACIDException("failed to complete index operation", e);
+ boolean countIsZero = transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
+ if (!countIsZero) {
+ // Lock count != 0 for a particular entity implies that the entity has been locked
+ // more than once (probably due to a hash collision in our current model).
+ // It is safe to decrease the active transaction count on indexes since,
+ // by virtue of the counter not being zero, there is another transaction
+ // that has increased the transaction count. Thus, decreasing it will not
+ // allow the data to be flushed (yet). The flush will occur when the log page
+ // flush thread decides to decrease the count for the last time.
+ try {
+ //decrease the transaction reference count on index
+ txnContext.decreaseActiveTransactionCountOnIndexes();
+ } catch (HyracksDataException e) {
+ throw new ACIDException("failed to complete index operation", e);
+ }
}
- *****************************/
return;
}
@@ -151,11 +159,11 @@
public TransactionSubsystem getTransactionProvider() {
return transactionProvider;
}
-
+
public void setMaxJobId(int jobId) {
maxJobId.set(Math.max(maxJobId.get(), jobId));
}
-
+
public int getMaxJobId() {
return maxJobId.get();
}
diff --git a/pom.xml b/pom.xml
index aa0bc82..c4e50c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@
</property>
</activation>
<properties>
- <test.heap.size>2047</test.heap.size>
+ <test.heap.size>2048</test.heap.size>
</properties>
</profile>