ASTERIXDB-1058: use ResourceHeapBufferAllocator for dataset memory components.
Change-Id: Ifd90fabc79e61f84370d415c38917b998db41466
Reviewed-on: https://asterix-gerrit.ics.uci.edu/481
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 86e6db5..e1e6d96 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -37,13 +37,6 @@
IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException;
/**
- * Allocates the memory budget of the dataset in the virtual buffer cache.
- * @param datasetID
- * @throws HyracksDataException
- */
- void allocateDatasetMemory(int datasetID) throws HyracksDataException;
-
- /**
* Flushes all open datasets synchronously.
* @throws HyracksDataException
*/
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index d78e2cb..776549a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.common.context;
-import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -29,12 +28,10 @@
public class BaseOperationTracker implements ILSMOperationTracker {
- protected final IDatasetLifecycleManager datasetLifecycleManager;
protected final int datasetID;
protected final DatasetInfo dsInfo;
- public BaseOperationTracker(IDatasetLifecycleManager datasetLifecycleManager, int datasetID, DatasetInfo dsInfo) {
- this.datasetLifecycleManager = datasetLifecycleManager;
+ public BaseOperationTracker(int datasetID, DatasetInfo dsInfo) {
this.datasetID = datasetID;
this.dsInfo = dsInfo;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b2e7b69..20b07fa 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -46,7 +46,7 @@
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
-import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import org.apache.hyracks.storage.common.buffercache.ResourceHeapBufferAllocator;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
import org.apache.hyracks.storage.common.file.LocalResource;
@@ -185,14 +185,14 @@
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null || !dsInfo.isRegistered) {
- throw new HyracksDataException("Failed to open index with resource ID " + resourceID
- + " since it does not exist.");
+ throw new HyracksDataException(
+ "Failed to open index with resource ID " + resourceID + " since it does not exist.");
}
IndexInfo iInfo = dsInfo.indexes.get(resourceID);
if (iInfo == null) {
- throw new HyracksDataException("Failed to open index with resource ID " + resourceID
- + " since it does not exist.");
+ throw new HyracksDataException(
+ "Failed to open index with resource ID " + resourceID + " since it does not exist.");
}
if (!dsInfo.isOpen && !dsInfo.isExternal) {
initializeDatasetVirtualBufferCache(did);
@@ -314,12 +314,14 @@
private void initializeDatasetVirtualBufferCache(int datasetID) {
List<IVirtualBufferCache> vbcs = new ArrayList<IVirtualBufferCache>();
synchronized (datasetVirtualBufferCaches) {
- int numPages = datasetID < firstAvilableUserDatasetID ? storageProperties
- .getMetadataMemoryComponentNumPages() : storageProperties.getMemoryComponentNumPages();
+ int numPages = datasetID < firstAvilableUserDatasetID
+ ? storageProperties.getMetadataMemoryComponentNumPages()
+ : storageProperties.getMemoryComponentNumPages();
for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
- MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache(
- new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(), numPages
- / storageProperties.getMemoryComponentsNum()));
+ MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
+ new VirtualBufferCache(new ResourceHeapBufferAllocator(this, Integer.toString(datasetID)),
+ storageProperties.getMemoryComponentPageSize(),
+ numPages / storageProperties.getMemoryComponentsNum()));
vbcs.add(vbc);
}
datasetVirtualBufferCaches.put(datasetID, vbcs);
@@ -331,7 +333,7 @@
synchronized (datasetOpTrackers) {
ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
if (opTracker == null) {
- opTracker = new PrimaryIndexOperationTracker(this, datasetID, logManager, getDatasetInfo(datasetID));
+ opTracker = new PrimaryIndexOperationTracker(datasetID, logManager, getDatasetInfo(datasetID));
datasetOpTrackers.put(datasetID, opTracker);
}
return opTracker;
@@ -488,14 +490,15 @@
public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
//schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
for (DatasetInfo dsInfo : datasetInfos.values()) {
- PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) getOperationTracker(dsInfo.datasetID);
+ PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) getOperationTracker(
+ dsInfo.datasetID);
synchronized (opTracker) {
for (IndexInfo iInfo : dsInfo.indexes.values()) {
AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.index
.getIOOperationCallback();
if (!(((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty()
- || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated() || opTracker
- .isFlushOnExit())) {
+ || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated()
+ || opTracker.isFlushOnExit())) {
long firstLSN = ioCallback.getFirstLSN();
if (firstLSN < targetLSN) {
opTracker.setFlushOnExit(true);
@@ -616,8 +619,8 @@
sb.append("[Datasets]\n");
sb.append(String.format(dsHeaderFormat, "DatasetID", "Open", "Reference Count", "Last Access"));
for (DatasetInfo dsInfo : datasetInfos.values()) {
- sb.append(String
- .format(dsFormat, dsInfo.datasetID, dsInfo.isOpen, dsInfo.referenceCount, dsInfo.lastAccess));
+ sb.append(
+ String.format(dsFormat, dsInfo.datasetID, dsInfo.isOpen, dsInfo.referenceCount, dsInfo.lastAccess));
}
sb.append("\n");
@@ -626,20 +629,19 @@
for (DatasetInfo dsInfo : datasetInfos.values()) {
for (Map.Entry<Long, IndexInfo> entry : dsInfo.indexes.entrySet()) {
IndexInfo iInfo = entry.getValue();
- sb.append(String.format(idxFormat, dsInfo.datasetID, entry.getKey(), iInfo.isOpen,
- iInfo.referenceCount, iInfo.index));
+ sb.append(String.format(idxFormat, dsInfo.datasetID, entry.getKey(), iInfo.isOpen, iInfo.referenceCount,
+ iInfo.index));
}
}
outputStream.write(sb.toString().getBytes());
}
- @Override
- public synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException {
+ private synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException {
DatasetInfo dsInfo = datasetInfos.get(datasetId);
if (dsInfo == null) {
- throw new HyracksDataException("Failed to allocate memory for dataset with ID " + datasetId
- + " since it is not open.");
+ throw new HyracksDataException(
+ "Failed to allocate memory for dataset with ID " + datasetId + " since it is not open.");
}
synchronized (dsInfo) {
// This is not needed for external datasets' indexes since they never use the virtual buffer cache.
@@ -664,8 +666,8 @@
private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException {
DatasetInfo dsInfo = datasetInfos.get(datasetId);
if (dsInfo == null) {
- throw new HyracksDataException("Failed to deallocate memory for dataset with ID " + datasetId
- + " since it is not open.");
+ throw new HyracksDataException(
+ "Failed to deallocate memory for dataset with ID " + datasetId + " since it is not open.");
}
synchronized (dsInfo) {
if (dsInfo.isOpen && dsInfo.memoryAllocated) {
@@ -677,4 +679,11 @@
}
}
}
+
+ @Override
+ public synchronized void allocateMemory(String resourceName) throws HyracksDataException {
+ //a resource name in the case of DatasetLifecycleManager is a dataset id which is passed to the ResourceHeapBufferAllocator.
+ int did = Integer.parseInt(resourceName);
+ allocateDatasetMemory(did);
+ }
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index c3d75b1..b4a3ac9 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -22,7 +22,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -49,9 +48,8 @@
private boolean flushOnExit = false;
private boolean flushLogCreated = false;
- public PrimaryIndexOperationTracker(IDatasetLifecycleManager datasetLifecycleManager, int datasetID,
- ILogManager logManager, DatasetInfo dsInfo) {
- super(datasetLifecycleManager, datasetID, dsInfo);
+ public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo) {
+ super(datasetID, dsInfo);
this.logManager = logManager;
this.numActiveOperations = new AtomicInteger();
}
@@ -60,9 +58,6 @@
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
- if (!dsInfo.isMemoryAllocated()) {
- datasetLifecycleManager.allocateDatasetMemory(dsInfo.getDatasetID());
- }
incrementNumActiveOperations(modificationCallback);
} else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
dsInfo.declareActiveIOOperation();
@@ -81,7 +76,7 @@
@Override
public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType,
ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
- throws HyracksDataException {
+ throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
decrementNumActiveOperations(modificationCallback);
if (numActiveOperations.get() == 0) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 0605b47..c068657 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -231,8 +231,8 @@
public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat,
- IMetadataEntity.PENDING_NO_OP));
+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
+ new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
}
public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
@@ -269,8 +269,8 @@
getBuiltinTypes(types);
getMetadataTypes(types);
for (int i = 0; i < types.size(); i++) {
- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, types.get(i).getTypeName(),
- types.get(i), false));
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
+ new Datatype(dataverseName, types.get(i).getTypeName(), types.get(i), false));
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Finished inserting initial datatypes.");
@@ -279,10 +279,11 @@
public static void insertInitialIndexes(MetadataTransactionContext mdTxnCtx) throws Exception {
for (int i = 0; i < secondaryIndexes.length; i++) {
- MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
- secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
- secondaryIndexes[i].getPartitioningExpr(), secondaryIndexes[i].getPartitioningExprType(), false,
- false, IMetadataEntity.PENDING_NO_OP));
+ MetadataManager.INSTANCE.addIndex(mdTxnCtx,
+ new Index(secondaryIndexes[i].getDataverseName(), secondaryIndexes[i].getIndexedDatasetName(),
+ secondaryIndexes[i].getIndexName(), IndexType.BTREE,
+ secondaryIndexes[i].getPartitioningExpr(), secondaryIndexes[i].getPartitioningExprType(),
+ false, false, IMetadataEntity.PENDING_NO_OP));
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Finished inserting initial indexes.");
@@ -382,31 +383,24 @@
+ IndexFileNameUtil.prepareFileName(metadataStore + File.separator + index.getFileNameRelativePath(),
runtimeContext.getMetaDataIODeviceId());
FileReference file = new FileReference(new File(filePath));
- List<IVirtualBufferCache> virtualBufferCaches = runtimeContext.getVirtualBufferCaches(index.getDatasetId()
- .getId());
+ List<IVirtualBufferCache> virtualBufferCaches = runtimeContext
+ .getVirtualBufferCaches(index.getDatasetId().getId());
ITypeTraits[] typeTraits = index.getTypeTraits();
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
LSMBTree lsmBtree = null;
long resourceID = -1;
- ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
- .getDatasetId().getId()) : new BaseOperationTracker(dataLifecycleManager,
- index.getDatasetId().getId(), dataLifecycleManager.getDatasetInfo(index
- .getDatasetId().getId()));
+ ILSMOperationTracker opTracker = index.isPrimaryIndex()
+ ? runtimeContext.getLSMBTreeOperationTracker(index.getDatasetId().getId())
+ : new BaseOperationTracker(index.getDatasetId().getId(),
+ dataLifecycleManager.getDatasetInfo(index.getDatasetId().getId()));
final String path = file.getFile().getPath();
if (create) {
- lsmBtree = LSMBTreeUtils.createLSMTree(
- virtualBufferCaches,
- file,
- bufferCache,
- fileMapProvider,
- typeTraits,
- comparatorFactories,
- bloomFilterKeyFields,
- runtimeContext.getBloomFilterFalsePositiveRate(),
- runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
- GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager), opTracker,
- runtimeContext.getLSMIOScheduler(),
+ lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
+ runtimeContext.getMetadataMergePolicyFactory()
+ .createMergePolicy(GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager),
+ opTracker, runtimeContext.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
null, null, null, null, true);
lsmBtree.create();
@@ -425,19 +419,14 @@
resourceID = resource.getResourceId();
lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(resource.getResourceName());
if (lsmBtree == null) {
- lsmBtree = LSMBTreeUtils.createLSMTree(
- virtualBufferCaches,
- file,
- bufferCache,
- fileMapProvider,
- typeTraits,
- comparatorFactories,
- bloomFilterKeyFields,
+ lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
+ typeTraits, comparatorFactories, bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(),
runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
- GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager), opTracker,
- runtimeContext.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
- .createIOOperationCallback(), index.isPrimaryIndex(), null, null, null, null, true);
+ GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager),
+ opTracker, runtimeContext.getLSMIOScheduler(),
+ LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
+ null, null, null, null, true);
dataLifecycleManager.register(path, lsmBtree);
}
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
index 7b1ec04..4c96839 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
@@ -39,7 +39,7 @@
public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
IDatasetLifecycleManager dslcManager = ((IAsterixAppRuntimeContext) ctx
.getJobletContext().getApplicationContext().getApplicationObject()).getDatasetLifecycleManager();
- return new BaseOperationTracker(dslcManager, datasetID, dslcManager.getDatasetInfo(datasetID));
+ return new BaseOperationTracker(datasetID, dslcManager.getDatasetInfo(datasetID));
}
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
index 26f56b7..a75428f 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
@@ -47,19 +47,14 @@
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) {
FileReference file = new FileReference(new File(filePath));
- LSMBTree lsmBTree = LSMBTreeUtils.createExternalBTree(
- file,
- runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(),
- typeTraits,
- cmpFactories,
- bloomFilterKeyFields,
+ LSMBTree lsmBTree = LSMBTreeUtils.createExternalBTree(file, runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields,
runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
runtimeContextProvider.getDatasetLifecycleManager()),
- new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(),
- datasetID, runtimeContextProvider.getDatasetLifecycleManager()
- .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+ new BaseOperationTracker(datasetID,
+ runtimeContextProvider.getDatasetLifecycleManager().getDatasetInfo(datasetID)),
+ runtimeContextProvider.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), -1, true);
return lsmBTree;
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
index beb1bb8..7b487d4 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
@@ -60,18 +60,14 @@
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) throws HyracksDataException {
FileReference file = new FileReference(new File(filePath));
- return LSMBTreeUtils.createExternalBTreeWithBuddy(
- file,
- runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(),
- typeTraits,
- btreeCmpFactories,
+ return LSMBTreeUtils.createExternalBTreeWithBuddy(file, runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getFileMapManager(), typeTraits, btreeCmpFactories,
runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
runtimeContextProvider.getDatasetLifecycleManager()),
- new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(),
- datasetID, runtimeContextProvider.getDatasetLifecycleManager()
- .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+ new BaseOperationTracker(datasetID,
+ runtimeContextProvider.getDatasetLifecycleManager().getDatasetInfo(datasetID)),
+ runtimeContextProvider.getLSMIOScheduler(),
LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), buddyBtreeFields, -1,
true);
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
index b9f5af9..89a5165 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
@@ -57,23 +57,16 @@
int partition) throws HyracksDataException {
FileReference file = new FileReference(new File(filePath));
try {
- return LSMRTreeUtils.createExternalRTree(
- file,
- runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(),
- typeTraits,
- rtreeCmpFactories,
- btreeCmpFactories,
- valueProviderFactories,
- rtreePolicyType,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ return LSMRTreeUtils.createExternalRTree(file, runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
+ valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
runtimeContextProvider.getDatasetLifecycleManager()),
- new BaseOperationTracker(runtimeContextProvider
- .getDatasetLifecycleManager(), datasetID, runtimeContextProvider
- .getDatasetLifecycleManager().getDatasetInfo(datasetID)), runtimeContextProvider
- .getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
- .createIOOperationCallback(), linearizeCmpFactory, btreeFields, -1, true);
+ new BaseOperationTracker(datasetID,
+ runtimeContextProvider.getDatasetLifecycleManager().getDatasetInfo(datasetID)),
+ runtimeContextProvider.getLSMIOScheduler(),
+ LSMRTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), linearizeCmpFactory,
+ btreeFields, -1, true);
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 0adcf53..e6e2b29 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -66,21 +66,15 @@
int partition) {
FileReference file = new FileReference(new File(filePath));
List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
- LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(
- virtualBufferCaches,
- file,
- runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(),
- typeTraits,
- cmpFactories,
- bloomFilterKeyFields,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file,
+ runtimeContextProvider.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits,
+ cmpFactories, bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
runtimeContextProvider.getDatasetLifecycleManager()),
- isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
- runtimeContextProvider.getDatasetLifecycleManager(), datasetID,
- runtimeContextProvider.getDatasetLifecycleManager()
- .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+ isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID)
+ : new BaseOperationTracker(datasetID,
+ runtimeContextProvider.getDatasetLifecycleManager().getDatasetInfo(datasetID)),
+ runtimeContextProvider.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), isPrimary, filterTypeTraits,
filterCmpFactories, btreeFields, filterFields, true);
return lsmBTree;
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index e635349..c087b56e 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -77,43 +77,28 @@
List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
try {
if (isPartitioned) {
- return InvertedIndexUtils.createPartitionedLSMInvertedIndex(
- virtualBufferCaches,
- runtimeContextProvider.getFileMapManager(),
- invListTypeTraits,
- invListCmpFactories,
- tokenTypeTraits,
- tokenCmpFactories,
- tokenizerFactory,
- runtimeContextProvider.getBufferCache(),
- filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCaches,
+ runtimeContextProvider.getFileMapManager(), invListTypeTraits, invListCmpFactories,
+ tokenTypeTraits, tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
+ filePath, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
runtimeContextProvider.getDatasetLifecycleManager()),
- new BaseOperationTracker(runtimeContextProvider
- .getDatasetLifecycleManager(), datasetID,
- runtimeContextProvider.getDatasetLifecycleManager()
- .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+ new BaseOperationTracker(datasetID,
+ runtimeContextProvider.getDatasetLifecycleManager().getDatasetInfo(datasetID)),
+ runtimeContextProvider.getLSMIOScheduler(),
LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, true);
} else {
- return InvertedIndexUtils.createLSMInvertedIndex(
- virtualBufferCaches,
- runtimeContextProvider.getFileMapManager(),
- invListTypeTraits,
- invListCmpFactories,
- tokenTypeTraits,
- tokenCmpFactories,
- tokenizerFactory,
- runtimeContextProvider.getBufferCache(),
- filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCaches,
+ runtimeContextProvider.getFileMapManager(), invListTypeTraits, invListCmpFactories,
+ tokenTypeTraits, tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
+ filePath, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
runtimeContextProvider.getDatasetLifecycleManager()),
- new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(), datasetID,
- runtimeContextProvider.getDatasetLifecycleManager()
- .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+ new BaseOperationTracker(datasetID,
+ runtimeContextProvider.getDatasetLifecycleManager().getDatasetInfo(datasetID)),
+ runtimeContextProvider.getLSMIOScheduler(),
LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, true);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index c99e052..7a43849 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -78,25 +78,16 @@
FileReference file = new FileReference(new File(filePath));
List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
try {
- return LSMRTreeUtils.createLSMTree(
- virtualBufferCaches,
- file,
- runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(),
- typeTraits,
- rtreeCmpFactories,
- btreeCmpFactories,
- valueProviderFactories,
- rtreePolicyType,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ return LSMRTreeUtils.createLSMTree(virtualBufferCaches, file, runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
+ valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
runtimeContextProvider.getDatasetLifecycleManager()),
- new BaseOperationTracker(runtimeContextProvider
- .getDatasetLifecycleManager(), datasetID, runtimeContextProvider
- .getDatasetLifecycleManager().getDatasetInfo(datasetID)), runtimeContextProvider
- .getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
- .createIOOperationCallback(), linearizeCmpFactory, rtreeFields, btreeFields,
- filterTypeTraits, filterCmpFactories, filterFields, true);
+ new BaseOperationTracker(datasetID,
+ runtimeContextProvider.getDatasetLifecycleManager().getDatasetInfo(datasetID)),
+ runtimeContextProvider.getLSMIOScheduler(),
+ LSMRTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), linearizeCmpFactory,
+ rtreeFields, btreeFields, filterTypeTraits, filterCmpFactories, filterFields, true);
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}