ASTERIXDB-1337: Dataset Memory Management on Multi-Partition NC
As sugggested in ASTERIXDB-1337, this change is to maintain a per-
partition MultitenantVirtualBufferCache budget, as opposed to sharing
the budget across the dataset.
Change-Id: Ibbf08f532c1210c30be6a51c73570a789174213b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/705
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 5532e79..c58462f 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
-import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -77,7 +76,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
@@ -175,7 +173,7 @@
localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
- IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
+ IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProviderForRecovery(
this);
txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
txnProperties);
@@ -189,7 +187,8 @@
initializeResourceIdFactory();
datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
- MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager());
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager(),
+ ioManager.getIODevices().size());
isShuttingdown = false;
@@ -361,11 +360,6 @@
}
@Override
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
- return datasetLifecycleManager.getVirtualBufferCaches(datasetID);
- }
-
- @Override
public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
return datasetLifecycleManager.getOperationTracker(datasetID);
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
similarity index 89%
rename from asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
rename to asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index b975970..7ac5036 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.api.common;
-import java.util.List;
-
import org.apache.asterix.common.api.AsterixThreadExecutor;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -28,17 +26,16 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
-public class AsterixAppRuntimeContextProdiverForRecovery implements IAsterixAppRuntimeContextProvider {
+public class AsterixAppRuntimeContextProviderForRecovery implements IAsterixAppRuntimeContextProvider {
private final AsterixAppRuntimeContext asterixAppRuntimeContext;
- public AsterixAppRuntimeContextProdiverForRecovery(AsterixAppRuntimeContext asterixAppRuntimeContext) {
+ public AsterixAppRuntimeContextProviderForRecovery(AsterixAppRuntimeContext asterixAppRuntimeContext) {
this.asterixAppRuntimeContext = asterixAppRuntimeContext;
}
@@ -88,11 +85,6 @@
}
@Override
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
- return asterixAppRuntimeContext.getVirtualBufferCaches(datasetID);
- }
-
- @Override
public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
return asterixAppRuntimeContext.getLSMBTreeOperationTracker(datasetID);
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 496a10e..ee9ed4a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.rmi.RemoteException;
-import java.util.List;
import java.util.concurrent.Executor;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -35,7 +34,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
@@ -75,8 +73,6 @@
public double getBloomFilterFalsePositiveRate();
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
-
public Object getFeedManager();
public IRemoteRecoveryManager getRemoteRecoveryManager();
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 3b4617a..552ce22 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -81,9 +81,10 @@
* creates (if necessary) and returns the dataset virtual buffer caches.
*
* @param datasetID
+ * @param ioDeviceNum
* @return
*/
- List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
+ List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int ioDeviceNum);
/**
* Flushes then closes all open datasets
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/ILocalResourceMetadata.java b/asterix-common/src/main/java/org/apache/asterix/common/api/ILocalResourceMetadata.java
index cdd05e5..6af7441 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/ILocalResourceMetadata.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/ILocalResourceMetadata.java
@@ -27,7 +27,7 @@
public interface ILocalResourceMetadata extends Serializable {
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition) throws HyracksDataException;
+ int partition, int ioDeviceNum) throws HyracksDataException;
public int getDatasetID();
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
index c58b013..12faad2 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
@@ -22,6 +22,7 @@
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
@@ -35,9 +36,11 @@
}
@Override
- public List<IVirtualBufferCache> getVirtualBufferCaches(IHyracksTaskContext ctx) {
+ public List<IVirtualBufferCache> getVirtualBufferCaches(IHyracksTaskContext ctx, IFileSplitProvider fileSplitProvider) {
+ final int partition = ctx.getTaskAttemptId().getTaskId().getPartition();
+ final int ioDeviceNum = fileSplitProvider.getFileSplits()[partition].getIODeviceId();
return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
- .getVirtualBufferCaches(datasetID);
+ .getDatasetLifecycleManager().getVirtualBufferCaches(datasetID, ioDeviceNum);
}
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index eb03015..8f9b5b0 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -53,7 +53,7 @@
public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
private final AsterixStorageProperties storageProperties;
- private final Map<Integer, List<IVirtualBufferCache>> datasetVirtualBufferCaches;
+ private final Map<Integer, DatasetVirtualBufferCaches> datasetVirtualBufferCachesMap;
private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
private final Map<Integer, DatasetInfo> datasetInfos;
private final ILocalResourceRepository resourceRepository;
@@ -62,14 +62,17 @@
private long used;
private final ILogManager logManager;
private final LogRecord logRecord;
+ private final int numPartitions;
public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
- ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager) {
+ ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID,
+ ILogManager logManager, int numPartitions) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
- datasetVirtualBufferCaches = new HashMap<Integer, List<IVirtualBufferCache>>();
+ this.numPartitions = numPartitions;
+ datasetVirtualBufferCachesMap = new HashMap<>();
datasetOpTrackers = new HashMap<Integer, ILSMOperationTracker>();
datasetInfos = new HashMap<Integer, DatasetInfo>();
capacity = storageProperties.getMemoryComponentGlobalBudget();
@@ -305,45 +308,36 @@
return openIndexesInfo;
}
- @Override
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
- synchronized (datasetVirtualBufferCaches) {
- List<IVirtualBufferCache> vbcs = datasetVirtualBufferCaches.get(datasetID);
+ private DatasetVirtualBufferCaches getVirtualBufferCaches(int datasetID) {
+ synchronized (datasetVirtualBufferCachesMap) {
+ DatasetVirtualBufferCaches vbcs = datasetVirtualBufferCachesMap.get(datasetID);
if (vbcs == null) {
- initializeDatasetVirtualBufferCache(datasetID);
- vbcs = datasetVirtualBufferCaches.get(datasetID);
- if (vbcs == null) {
- throw new IllegalStateException("Could not find dataset " + datasetID + " virtual buffer cache.");
- }
+ vbcs = initializeDatasetVirtualBufferCache(datasetID);
}
return vbcs;
}
}
+ @Override
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int ioDeviceNum) {
+ DatasetVirtualBufferCaches dvbcs = getVirtualBufferCaches(datasetID);
+ return dvbcs.getVirtualBufferCaches(ioDeviceNum);
+ }
+
private void removeDatasetFromCache(int datasetID) throws HyracksDataException {
deallocateDatasetMemory(datasetID);
datasetInfos.remove(datasetID);
- datasetVirtualBufferCaches.remove(datasetID);
+ datasetVirtualBufferCachesMap.remove(datasetID);
datasetOpTrackers.remove(datasetID);
}
- private void initializeDatasetVirtualBufferCache(int datasetID) {
- List<IVirtualBufferCache> vbcs = new ArrayList<IVirtualBufferCache>();
- synchronized (datasetVirtualBufferCaches) {
- int numPages = datasetID < firstAvilableUserDatasetID
- ? storageProperties.getMetadataMemoryComponentNumPages()
- : storageProperties.getMemoryComponentNumPages();
- for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
- MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
- new VirtualBufferCache(new ResourceHeapBufferAllocator(this, Integer.toString(datasetID)),
- storageProperties.getMemoryComponentPageSize(),
- numPages / storageProperties.getMemoryComponentsNum()));
- vbcs.add(vbc);
- }
- datasetVirtualBufferCaches.put(datasetID, vbcs);
+ private DatasetVirtualBufferCaches initializeDatasetVirtualBufferCache(int datasetID) {
+ synchronized (datasetVirtualBufferCachesMap) {
+ DatasetVirtualBufferCaches dvbcs = new DatasetVirtualBufferCaches(datasetID);
+ datasetVirtualBufferCachesMap.put(datasetID, dvbcs);
+ return dvbcs;
}
}
-
@Override
public ILSMOperationTracker getOperationTracker(int datasetID) {
synchronized (datasetOpTrackers) {
@@ -356,7 +350,7 @@
}
}
- private abstract class Info {
+ private static abstract class Info {
protected int referenceCount;
protected boolean isOpen;
@@ -374,7 +368,7 @@
}
}
- public class IndexInfo extends Info {
+ public static class IndexInfo extends Info {
private final ILSMIndex index;
private final long resourceId;
private final int datasetId;
@@ -398,7 +392,7 @@
}
}
- public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
+ public static class DatasetInfo extends Info implements Comparable<DatasetInfo> {
private final Map<Long, IndexInfo> indexes;
private final int datasetID;
private long lastAccess;
@@ -639,7 +633,7 @@
closeAllDatasets();
- datasetVirtualBufferCaches.clear();
+ datasetVirtualBufferCachesMap.clear();
datasetOpTrackers.clear();
datasetInfos.clear();
}
@@ -687,11 +681,7 @@
synchronized (dsInfo) {
// This is not needed for external datasets' indexes since they never use the virtual buffer cache.
if (!dsInfo.memoryAllocated && !dsInfo.isExternal) {
- List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
- long additionalSize = 0;
- for (IVirtualBufferCache vbc : vbcs) {
- additionalSize += vbc.getNumPages() * vbc.getPageSize();
- }
+ long additionalSize = getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
while (used + additionalSize > capacity) {
if (!evictCandidateDataset()) {
throw new HyracksDataException("Cannot allocate dataset " + dsInfo.datasetID
@@ -712,10 +702,7 @@
}
synchronized (dsInfo) {
if (dsInfo.isOpen && dsInfo.memoryAllocated) {
- List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
- for (IVirtualBufferCache vbc : vbcs) {
- used -= (vbc.getNumPages() * vbc.getPageSize());
- }
+ used -= getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
dsInfo.memoryAllocated = false;
}
}
@@ -727,4 +714,49 @@
int did = Integer.parseInt(resourcePath);
allocateDatasetMemory(did);
}
+
+ private class DatasetVirtualBufferCaches {
+ private final int datasetID;
+ private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches = new HashMap<>();
+
+ public DatasetVirtualBufferCaches(int datasetID) {
+ this.datasetID = datasetID;
+ }
+
+ private List<IVirtualBufferCache> initializeVirtualBufferCaches(int ioDeviceNum) {
+ assert ioDeviceVirtualBufferCaches.size() < numPartitions;
+ int numPages = datasetID < firstAvilableUserDatasetID
+ ? storageProperties.getMetadataMemoryComponentNumPages()
+ : storageProperties.getMemoryComponentNumPages();
+ List<IVirtualBufferCache> vbcs = new ArrayList<>();
+ for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
+ MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
+ new VirtualBufferCache(new ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
+ Integer.toString(datasetID)), storageProperties.getMemoryComponentPageSize(),
+ numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
+ vbcs.add(vbc);
+ }
+ ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
+ return vbcs;
+ }
+
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int ioDeviceNum) {
+ synchronized (ioDeviceVirtualBufferCaches) {
+ List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum);
+ if (vbcs == null) {
+ vbcs = initializeVirtualBufferCaches(ioDeviceNum);
+ }
+ return vbcs;
+ }
+ }
+
+ public long getTotalSize() {
+ int numPages = datasetID < firstAvilableUserDatasetID
+ ? storageProperties.getMetadataMemoryComponentNumPages()
+ : storageProperties.getMemoryComponentNumPages();
+
+ return storageProperties.getMemoryComponentPageSize() * numPages;
+ }
+ }
+
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index 6382af9..d0dd3fa 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -18,15 +18,12 @@
*/
package org.apache.asterix.common.transactions;
-import java.util.List;
-
import org.apache.asterix.common.api.AsterixThreadExecutor;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
@@ -56,7 +53,5 @@
public IIOManager getIOManager();
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
-
public IAsterixAppRuntimeContext getAppContext();
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index b83f2f3..07d0364 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -349,8 +349,8 @@
String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
- List<IVirtualBufferCache> virtualBufferCaches = runtimeContext
- .getVirtualBufferCaches(index.getDatasetId().getId());
+ List<IVirtualBufferCache> virtualBufferCaches = runtimeContext.getDatasetLifecycleManager()
+ .getVirtualBufferCaches(index.getDatasetId().getId(), metadataPartition.getIODeviceNum());
ITypeTraits[] typeTraits = index.getTypeTraits();
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
index a75428f..33b7f30 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
@@ -45,7 +45,7 @@
@Override
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition) {
+ int partition, int ioDeviceNum) {
FileReference file = new FileReference(new File(filePath));
LSMBTree lsmBTree = LSMBTreeUtils.createExternalBTree(file, runtimeContextProvider.getBufferCache(),
runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields,
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
index 7b487d4..04006f1 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
@@ -58,7 +58,7 @@
@Override
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition) throws HyracksDataException {
+ int partition, int ioDeviceNum) throws HyracksDataException {
FileReference file = new FileReference(new File(filePath));
return LSMBTreeUtils.createExternalBTreeWithBuddy(file, runtimeContextProvider.getBufferCache(),
runtimeContextProvider.getFileMapManager(), typeTraits, btreeCmpFactories,
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
index 89a5165..ed468b9 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
@@ -54,7 +54,7 @@
@Override
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition) throws HyracksDataException {
+ int partition, int ioDeviceNum) throws HyracksDataException {
FileReference file = new FileReference(new File(filePath));
try {
return LSMRTreeUtils.createExternalRTree(file, runtimeContextProvider.getBufferCache(),
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 88e95dd..b1db1f3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -19,9 +19,9 @@
package org.apache.asterix.transaction.management.resource;
import java.io.File;
-import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.context.BaseOperationTracker;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
@@ -32,7 +32,6 @@
import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
public class LSMBTreeLocalResourceMetadata extends AbstractLSMLocalResourceMetadata {
@@ -62,17 +61,15 @@
@Override
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition) {
+ int partition, int ioDeviceNum) {
FileReference file = new FileReference(new File(filePath));
- List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
- LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file,
- runtimeContextProvider.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits,
- cmpFactories, bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
- mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getDatasetLifecycleManager()),
+ final IDatasetLifecycleManager datasetLifecycleManager = runtimeContextProvider.getDatasetLifecycleManager();
+ LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(datasetLifecycleManager.getVirtualBufferCaches(datasetID,
+ ioDeviceNum), file, runtimeContextProvider.getBufferCache(), runtimeContextProvider.getFileMapManager(),
+ typeTraits, cmpFactories, bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties, datasetLifecycleManager),
isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID)
- : new BaseOperationTracker(datasetID,
- runtimeContextProvider.getDatasetLifecycleManager().getDatasetInfo(datasetID)),
+ : new BaseOperationTracker(datasetID, datasetLifecycleManager.getDatasetInfo(datasetID)),
runtimeContextProvider.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), isPrimary, filterTypeTraits,
filterCmpFactories, btreeFields, filterFields, true);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index c087b56e..87319b4 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -73,8 +73,9 @@
@Override
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition) throws HyracksDataException {
- List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
+ int partition, int ioDeviceNum) throws HyracksDataException {
+ List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getDatasetLifecycleManager()
+ .getVirtualBufferCaches(datasetID, ioDeviceNum);
try {
if (isPartitioned) {
return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCaches,
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index 7a43849..bae36d4 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -74,9 +74,10 @@
@Override
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition) throws HyracksDataException {
+ int partition, int ioDeviceNum) throws HyracksDataException {
FileReference file = new FileReference(new File(filePath));
- List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
+ List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getDatasetLifecycleManager()
+ .getVirtualBufferCaches(datasetID, ioDeviceNum);
try {
return LSMRTreeUtils.createLSMTree(virtualBufferCaches, file, runtimeContextProvider.getBufferCache(),
runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index db7c0a0..3a1e729 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -419,7 +419,11 @@
*/
public String getPartitionPath(int partition) {
//currently each partition is replicated on the same IO device number on all NCs.
- return mountPoints[clusterPartitions.get(partition).getIODeviceNum()];
+ return mountPoints[getIODeviceNum(partition)];
+ }
+
+ public int getIODeviceNum(int partition) {
+ return clusterPartitions.get(partition).getIODeviceNum();
}
public Set<Integer> getActivePartitions() {
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index 358913b..3a4a56a 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -18,9 +18,6 @@
*/
package org.apache.asterix.transaction.management.service.locking;
-import static org.mockito.Mockito.mock;
-
-import java.util.List;
import java.util.concurrent.Executors;
import org.apache.asterix.common.api.AsterixThreadExecutor;
@@ -31,12 +28,13 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+import static org.mockito.Mockito.mock;
+
class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
AsterixThreadExecutor ate = new AsterixThreadExecutor(Executors.defaultThreadFactory());
@@ -98,11 +96,6 @@
}
@Override
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public IAsterixAppRuntimeContext getAppContext() {
throw new UnsupportedOperationException();
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index c462b22..9a74a18 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -370,7 +370,8 @@
//#. create index instance and register to indexLifeCycleManager
localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
index = localResourceMetadata.createIndexInstance(appRuntimeContext,
- resourceAbsolutePath, localResource.getPartition());
+ resourceAbsolutePath, localResource.getPartition(),
+ localResourceRepository.getIODeviceNum(localResource.getPartition()));
datasetLifecycleManager.register(resourceAbsolutePath, index);
datasetLifecycleManager.open(resourceAbsolutePath);