Checkpoint, the code does not compile.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index 7463d1c..18a81e5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -15,6 +15,7 @@
package edu.uci.ics.asterix.api.common;
import java.io.IOException;
+import java.util.List;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
@@ -213,8 +214,8 @@
}
@Override
- public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
- return indexLifecycleManager.getVirtualBufferCache(datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
+ return indexLifecycleManager.getVirtualBufferCaches(datasetID);
}
@Override
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index 712d993..439feff 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.asterix.api.common;
+import java.util.List;
+
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -88,8 +90,8 @@
}
@Override
- public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
- return asterixAppRuntimeContext.getVirtualBufferCache(datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
+ return asterixAppRuntimeContext.getVirtualBufferCaches(datasetID);
}
@Override
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
index d035303..4287212 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -15,6 +15,7 @@
package edu.uci.ics.asterix.common.api;
import java.io.IOException;
+import java.util.List;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -62,5 +63,5 @@
public double getBloomFilterFalsePositiveRate();
- public IVirtualBufferCache getVirtualBufferCache(int datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixVirtualBufferCacheProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixVirtualBufferCacheProvider.java
index bd2828d..9efc9fd 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixVirtualBufferCacheProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixVirtualBufferCacheProvider.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.asterix.common.context;
+import java.util.List;
+
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -29,9 +31,9 @@
}
@Override
- public IVirtualBufferCache getVirtualBufferCache(IHyracksTaskContext ctx) {
+ public List<IVirtualBufferCache> getVirtualBufferCaches(IHyracksTaskContext ctx) {
return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
- .getVirtualBufferCache(datasetID);
+ .getVirtualBufferCaches(datasetID);
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 97b9346..a4d4daa 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -42,18 +42,19 @@
public class DatasetLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent {
private final AsterixStorageProperties storageProperties;
- private final Map<Integer, MultitenantVirtualBufferCache> datasetVirtualBufferCaches;
+ private final Map<Integer, List<IVirtualBufferCache>> datasetVirtualBufferCaches;
private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
private final Map<Integer, DatasetInfo> datasetInfos;
private final ILocalResourceRepository resourceRepository;
private final long capacity;
private long used;
+ private final static int NUM_MUTABLE_BUFFERS = 2;
public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
ILocalResourceRepository resourceRepository) {
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
- datasetVirtualBufferCaches = new HashMap<Integer, MultitenantVirtualBufferCache>();
+ datasetVirtualBufferCaches = new HashMap<Integer, List<IVirtualBufferCache>>();
datasetOpTrackers = new HashMap<Integer, ILSMOperationTracker>();
datasetInfos = new HashMap<Integer, DatasetInfo>();
capacity = storageProperties.getMemoryComponentGlobalBudget();
@@ -113,9 +114,11 @@
}
if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty()) {
- IVirtualBufferCache vbc = getVirtualBufferCache(did);
- assert vbc != null;
- used -= (vbc.getNumPages() * vbc.getPageSize());
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
+ assert vbcs != null;
+ for (IVirtualBufferCache vbc : vbcs) {
+ used -= (vbc.getNumPages() * vbc.getPageSize());
+ }
datasetInfos.remove(did);
}
@@ -137,9 +140,12 @@
}
if (!dsInfo.isOpen) {
- IVirtualBufferCache vbc = getVirtualBufferCache(did);
- assert vbc != null;
- long additionalSize = vbc.getNumPages() * vbc.getPageSize();
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
+ assert vbcs != null;
+ long additionalSize = 0;
+ for (IVirtualBufferCache vbc : vbcs) {
+ additionalSize += vbc.getNumPages() * vbc.getPageSize();
+ }
while (used + additionalSize > capacity) {
if (!evictCandidateDataset()) {
throw new HyracksDataException("Cannot activate index since memory budget would be exceeded.");
@@ -166,7 +172,7 @@
Collections.sort(datasetInfosList);
for (DatasetInfo dsInfo : datasetInfosList) {
ILSMOperationTracker opTracker = datasetOpTrackers.get(dsInfo.datasetID);
- if (opTracker != null && ((PrimaryIndexOperationTracker) opTracker).getNumActiveOperations() == 0
+ if (opTracker != null && ((PrimaryIndexOperationTracker) opTracker).isActiveDataset()
&& dsInfo.referenceCount == 0 && dsInfo.isOpen) {
for (IndexInfo iInfo : dsInfo.indexes.values()) {
if (iInfo.isOpen) {
@@ -176,8 +182,10 @@
assert iInfo.referenceCount == 0;
}
- IVirtualBufferCache vbc = getVirtualBufferCache(dsInfo.datasetID);
- used -= vbc.getNumPages() * vbc.getPageSize();
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
+ for (IVirtualBufferCache vbc : vbcs) {
+ used -= vbc.getNumPages() * vbc.getPageSize();
+ }
dsInfo.isOpen = false;
return true;
}
@@ -213,15 +221,20 @@
return openIndexes;
}
- public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
synchronized (datasetVirtualBufferCaches) {
- MultitenantVirtualBufferCache vbc = datasetVirtualBufferCaches.get(datasetID);
- if (vbc == null) {
- vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache(new HeapBufferAllocator(),
- storageProperties.getMemoryComponentPageSize(), storageProperties.getMemoryComponentNumPages()));
- datasetVirtualBufferCaches.put(datasetID, vbc);
+ List<IVirtualBufferCache> vbcs = datasetVirtualBufferCaches.get(datasetID);
+ if (vbcs == null) {
+ vbcs = new ArrayList<IVirtualBufferCache>();
+ for (int i = 0; i < NUM_MUTABLE_BUFFERS; i++) {
+ MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache(
+ new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(),
+ storageProperties.getMemoryComponentNumPages() / NUM_MUTABLE_BUFFERS));
+ vbcs.add(vbc);
+ }
+ datasetVirtualBufferCaches.put(datasetID, vbcs);
}
- return vbc;
+ return vbcs;
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 3347fcc..ef8748d 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -15,7 +15,9 @@
package edu.uci.ics.asterix.common.context;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -30,27 +32,27 @@
public class PrimaryIndexOperationTracker extends BaseOperationTracker {
private final DatasetLifecycleManager datasetLifecycleManager;
- private final IVirtualBufferCache datasetBufferCache;
+ private final List<IVirtualBufferCache> datasetBufferCaches;
private final int datasetID;
// Number of active operations on a ILSMIndex instance.
- private int numActiveOperations;
+ private AtomicInteger[] numActiveOperations;
public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
super(ioOpCallbackFactory);
this.datasetLifecycleManager = datasetLifecycleManager;
- this.numActiveOperations = 0;
this.datasetID = datasetID;
- datasetBufferCache = datasetLifecycleManager.getVirtualBufferCache(datasetID);
+ this.datasetBufferCaches = datasetLifecycleManager.getVirtualBufferCaches(datasetID);
+ this.numActiveOperations = new AtomicInteger[datasetBufferCaches.size()];
+ for (int i = 0; i < numActiveOperations.length; i++) {
+ this.numActiveOperations[i] = new AtomicInteger(0);
+ }
}
@Override
- public synchronized void beforeOperation(ILSMIndex index, LSMOperationType opType,
- ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
- throws HyracksDataException {
- if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
- numActiveOperations++;
- }
+ public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ numActiveOperations[index.getCurrentMutableComponentId()].incrementAndGet();
}
@Override
@@ -66,26 +68,16 @@
@Override
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
- int nActiveOps;
- synchronized (this) {
- if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
- numActiveOperations--;
- }
- nActiveOps = numActiveOperations;
- }
+ int nActiveOps = numActiveOperations[index.getCurrentMutableComponentId()].decrementAndGet();
+
if (opType != LSMOperationType.FLUSH) {
flushIfFull(nActiveOps);
}
}
- private void flushIfFull(int nActiveOps) throws HyracksDataException {
+ private void flushIfFull(int componentId, int nActiveOps) throws HyracksDataException {
// If we need a flush, and this is the last completing operation, then schedule the flush.
- if (datasetBufferCache.isFull() && nActiveOps == 0) {
- synchronized (this) {
- if (numActiveOperations > 0) {
- return;
- }
- }
+ if (datasetBufferCaches.get(componentId).isFull() && nActiveOps == 0) {
Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
for (ILSMIndex lsmIndex : indexes) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
@@ -97,13 +89,18 @@
}
public void exclusiveJobCommitted() throws HyracksDataException {
- synchronized (this) {
- numActiveOperations = 0;
+ for (int i = 0; i < numActiveOperations.length; i++) {
+ numActiveOperations[i].set(0);
+ flushIfFull(i, 0);
}
- flushIfFull(0);
}
- public synchronized int getNumActiveOperations() {
- return numActiveOperations;
+ public boolean isActiveDataset() {
+ for (int i = 0; i < numActiveOperations.length; i++) {
+ if (numActiveOperations[i].get() > 0) {
+ return false;
+ }
+ }
+ return true;
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index 05ac025..2f522b9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.asterix.common.transactions;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
@@ -58,5 +60,5 @@
public IIOManager getIOManager();
- public IVirtualBufferCache getVirtualBufferCache(int datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index aa976f8..82fee45 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -333,7 +333,8 @@
+ IndexFileNameUtil.prepareFileName(metadataStore + File.separator + index.getFileNameRelativePath(),
runtimeContext.getMetaDataIODeviceId());
FileReference file = new FileReference(new File(filePath));
- IVirtualBufferCache virtualBufferCache = runtimeContext.getVirtualBufferCache(index.getDatasetId().getId());
+ List<IVirtualBufferCache> virtualBufferCaches = runtimeContext.getVirtualBufferCaches(index.getDatasetId()
+ .getId());
ITypeTraits[] typeTraits = index.getTypeTraits();
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
@@ -344,7 +345,7 @@
ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
.getDatasetId().getId()) : new BaseOperationTracker(LSMBTreeIOOperationCallbackFactory.INSTANCE);
if (create) {
- lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCache, file, bufferCache, fileMapProvider, typeTraits,
+ lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
runtimeContext.getLSMMergePolicy(), opTracker, runtimeContext.getLSMIOScheduler(), rtcProvider);
lsmBtree.create();
@@ -361,7 +362,7 @@
resourceID = localResourceRepository.getResourceByName(file.getFile().getPath()).getResourceId();
lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
if (lsmBtree == null) {
- lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCache, file, bufferCache, fileMapProvider,
+ lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
typeTraits, comparatorFactories, bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
opTracker, runtimeContext.getLSMIOScheduler(),
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 8ce0174..cd894a1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -15,6 +15,7 @@
package edu.uci.ics.asterix.transaction.management.resource;
import java.io.File;
+import java.util.List;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
@@ -49,8 +50,8 @@
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) {
FileReference file = new FileReference(new File(filePath));
- IVirtualBufferCache virtualBufferCache = runtimeContextProvider.getVirtualBufferCache(datasetID);
- LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCache, file, runtimeContextProvider
+ List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
+ LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, runtimeContextProvider
.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories,
bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
.getLSMMergePolicy(), isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID)